In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType

In [2]:
import sys
sys.executable

'c:\\Users\\Admin\\anaconda3\\envs\\myenv\\python.exe'

In [3]:
spark = SparkSession.builder.master("local[1]")\
                .config("spark.driver.extraClassPath", "../jars/sqljdbc41.jar")\
                .config('spark.executor.extraClassPath', "../jars/sqljdbc41.jar")\
                .config("spark.jars", "./jars/sqljdbc41.jar")\
                .appName("SparkETL.com")\
                .getOrCreate()


In [4]:
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false'),
 ('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1715737824079'),
 ('spark.app.submitTime', '

In [5]:
source_schema = StructType([
        StructField("ID", IntegerType(), False),
        StructField("USERNAME", StringType(), False),
        StructField("NUMBER_FOLLOWS", StringType(), False),
        StructField("NUMBER_TRACKS", StringType(), False),
        StructField("LINK", StringType(), False)])

In [6]:
source_df = spark.read.options(delimiter=',').csv("Soundcloud_User.csv", schema=source_schema,header=True)
source_df.show()

+---+--------------------+-------------------+-------------+--------------------+
| ID|            USERNAME|     NUMBER_FOLLOWS|NUMBER_TRACKS|                LINK|
+---+--------------------+-------------------+-------------+--------------------+
|  0|                   a|   14,973 followers|            0|https://soundclou...|
|  1|            Hoodadk4|   15,935 followers|            0|https://soundclou...|
|  2|           Sexyy Red|   45,021 followers|            0|https://soundclou...|
|  3|               4batz|   34,867 followers|            0|https://soundclou...|
|  4|         Sara Landry|   86,088 followers|            0|https://soundclou...|
|  5|                AKON|  238,447 followers|            0|https://soundclou...|
|  6|A BOOGIE WIT DA H...|1,587,348 followers|            0|https://soundclou...|
|  7|           21 Savage|1,671,201 followers|            0|https://soundclou...|
|  8|MC IG - 4M (CONTA...|    8,449 followers|            0|https://soundclou...|
|  9|      Nico 

In [7]:
from dotenv import load_dotenv, dotenv_values

USERNAME = dotenv_values(".env").get("USERNAME")
HOST = dotenv_values(".env").get("HOST")
CONNECTION_STRING = dotenv_values(".env").get("CONNECTION_STRING")
PASSWORD = dotenv_values(".env").get("PASSWORD")
TABLE_NAME = 'CustomerSource'

url = f"jdbc:sqlserver://{HOST}:1433;database=SoundCloud"
properties = {
    "user": dotenv_values('.env').get('USERNAME'),
    "password": dotenv_values('.env').get('PASSWORD'),
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

In [8]:
df = spark.read \
  .format("jdbc") \
  .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  .option("url", url) \
  .option("dbtable", "[dbo].[User]") \
  .option("user", USERNAME) \
  .option("password", PASSWORD) \
  .load()

  

In [9]:
df.show()

+---+--------------------+-------------+------------+--------------------+
| Id|            Username|NumberFollows|NumberTracks|                Link|
+---+--------------------+-------------+------------+--------------------+
|  0|                   a|        14973|           0|https://soundclou...|
|  1|            Hoodadk4|        15935|           0|https://soundclou...|
|  2|           Sexyy Red|        45021|           0|https://soundclou...|
|  3|               4batz|        34867|           0|https://soundclou...|
|  4|         Sara Landry|        86088|           0|https://soundclou...|
|  5|                AKON|       238447|           0|https://soundclou...|
|  6|A BOOGIE WIT DA H...|      1587348|           0|https://soundclou...|
|  7|           21 Savage|      1671201|           0|https://soundclou...|
|  8|MC IG - 4M (CONTA...|         8449|           0|https://soundclou...|
|  9|      Nico Morrisson|         9158|           0|https://soundclou...|
| 10| Federico Maniscalco

In [10]:
list(source_schema)

[StructField('ID', IntegerType(), False),
 StructField('USERNAME', StringType(), False),
 StructField('NUMBER_FOLLOWS', StringType(), False),
 StructField('NUMBER_TRACKS', StringType(), False),
 StructField('LINK', StringType(), False)]

In [11]:
source_df.write \
  .format("jdbc") \
  .option("mode","overwrite")\
  .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
  .option("url", url) \
  .option("dbtable", TABLE_NAME) \
  .option("user", USERNAME) \
  .option("password", PASSWORD)\
  .save(mode="overwrite")

In [12]:
source_df.show()

+---+--------------------+-------------------+-------------+--------------------+
| ID|            USERNAME|     NUMBER_FOLLOWS|NUMBER_TRACKS|                LINK|
+---+--------------------+-------------------+-------------+--------------------+
|  0|                   a|   14,973 followers|            0|https://soundclou...|
|  1|            Hoodadk4|   15,935 followers|            0|https://soundclou...|
|  2|           Sexyy Red|   45,021 followers|            0|https://soundclou...|
|  3|               4batz|   34,867 followers|            0|https://soundclou...|
|  4|         Sara Landry|   86,088 followers|            0|https://soundclou...|
|  5|                AKON|  238,447 followers|            0|https://soundclou...|
|  6|A BOOGIE WIT DA H...|1,587,348 followers|            0|https://soundclou...|
|  7|           21 Savage|1,671,201 followers|            0|https://soundclou...|
|  8|MC IG - 4M (CONTA...|    8,449 followers|            0|https://soundclou...|
|  9|      Nico 

In [13]:

from pyspark.sql.functions import split, regexp_replace
from pyspark.sql.functions import col

def extract_number_follow(value):
    return split(value," ").getItem(0)
df=source_df.withColumn("NEW_NUMBER_FOLLOWS",extract_number_follow(source_df.NUMBER_FOLLOWS))
df.show()


+---+--------------------+-------------------+-------------+--------------------+------------------+
| ID|            USERNAME|     NUMBER_FOLLOWS|NUMBER_TRACKS|                LINK|NEW_NUMBER_FOLLOWS|
+---+--------------------+-------------------+-------------+--------------------+------------------+
|  0|                   a|   14,973 followers|            0|https://soundclou...|            14,973|
|  1|            Hoodadk4|   15,935 followers|            0|https://soundclou...|            15,935|
|  2|           Sexyy Red|   45,021 followers|            0|https://soundclou...|            45,021|
|  3|               4batz|   34,867 followers|            0|https://soundclou...|            34,867|
|  4|         Sara Landry|   86,088 followers|            0|https://soundclou...|            86,088|
|  5|                AKON|  238,447 followers|            0|https://soundclou...|           238,447|
|  6|A BOOGIE WIT DA H...|1,587,348 followers|            0|https://soundclou...|         1

In [14]:
df.createOrReplaceTempView("soundcloud")

In [15]:
df_2 = spark.sql("""
    Select *, cast(replace(new_number_follows,',','') as INT) as DB_FOLLOWS
    from soundcloud

""")

df_2.createOrReplaceTempView("transformed_data")

In [16]:
destination = spark.sql("""SELECT username, db_follows, number_tracks, link from transformed_data """)

In [17]:
destination.show()

+--------------------+----------+-------------+--------------------+
|            username|db_follows|number_tracks|                link|
+--------------------+----------+-------------+--------------------+
|                   a|     14973|            0|https://soundclou...|
|            Hoodadk4|     15935|            0|https://soundclou...|
|           Sexyy Red|     45021|            0|https://soundclou...|
|               4batz|     34867|            0|https://soundclou...|
|         Sara Landry|     86088|            0|https://soundclou...|
|                AKON|    238447|            0|https://soundclou...|
|A BOOGIE WIT DA H...|   1587348|            0|https://soundclou...|
|           21 Savage|   1671201|            0|https://soundclou...|
|MC IG - 4M (CONTA...|      8449|            0|https://soundclou...|
|      Nico Morrisson|      9158|            0|https://soundclou...|
| Federico Maniscalco|      8962|            0|https://soundclou...|
|            PK Sonik|      8067| 

In [18]:
destination.schema

StructType([StructField('username', StringType(), True), StructField('db_follows', IntegerType(), True), StructField('number_tracks', StringType(), True), StructField('link', StringType(), True)])

In [19]:
from packages.ExtractLoad import ExtractLoadFiletoDB
from packages.Transform import Transform
from dotenv import load_dotenv, dotenv_values
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType

source_schema = StructType([
        StructField("ID", IntegerType(), False),
        StructField("USERNAME", StringType(), False),
        StructField("NUMBER_FOLLOWS",StringType(), False),
        StructField("NUMBER_TRACKS", IntegerType(), False),
        StructField("LINK", StringType(), False)])
        
FILEPATH = "Soundcloud_User.csv"

USERNAME = dotenv_values(".env").get("USERNAME")
HOST = dotenv_values(".env").get("HOST")
CONNECTION_STRING = dotenv_values(".env").get("CONNECTION_STRING")
PASSWORD = dotenv_values(".env").get("PASSWORD")

URL = dotenv_values(".env").get("URL")

# Extract data from local and create a spark session
IngestProcess = ExtractLoadFiletoDB()

IngestProcess.createSparkSession(masterName="local[1]", appName="ETLSpark.com", driverPos="./jars/sqljdbc42.jar")

IngestProcess.extractSourceFile(filePath=FILEPATH, delimiter=",", schema=source_schema)

source = IngestProcess.sourceDataFrame

spark = IngestProcess.spark

# Transform daa
TransformProcess = Transform(data= source, spark=spark)

temp = TransformProcess.string_to_int(source, "NUMBER_FOLLOWS", "DB_NUMBER_FOLLOWS")

temp_1 = TransformProcess.string_to_int(temp, "NUMBER_TRACKS", "DB_NUMBER_TRACKS")

temp_2 = temp_1.withColumnRenamed("db_number_follows","NumberFollows").withColumnRenamed("db_number_tracks","NumberTracks")

destination = temp_2.select("Id","Username", "NumberFollows", "NumberTracks", "Link")

#Load data to table in database
IngestProcess.createDBConnection(url = URL, username=USERNAME, password=PASSWORD)

IngestProcess.writeDBTable(tablename="[dbo].[SoundCloudUser]", dataframe=destination)



In [20]:
destination.withColumnRenamed("db_number_follows","NumberFollows").withColumnRenamed("db_number_tracks","NumberTracks").show()

+---+--------------------+-------------+------------+--------------------+
| Id|            Username|NumberFollows|NumberTracks|                Link|
+---+--------------------+-------------+------------+--------------------+
|  0|                   a|        14973|           0|https://soundclou...|
|  1|            Hoodadk4|        15935|           0|https://soundclou...|
|  2|           Sexyy Red|        45021|           0|https://soundclou...|
|  3|               4batz|        34867|           0|https://soundclou...|
|  4|         Sara Landry|        86088|           0|https://soundclou...|
|  5|                AKON|       238447|           0|https://soundclou...|
|  6|A BOOGIE WIT DA H...|      1587348|           0|https://soundclou...|
|  7|           21 Savage|      1671201|           0|https://soundclou...|
|  8|MC IG - 4M (CONTA...|         8449|           0|https://soundclou...|
|  9|      Nico Morrisson|         9158|           0|https://soundclou...|
| 10| Federico Maniscalco