In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import length
import sqlite3
import os

In [0]:
# Initialize a SparkSession
spark = SparkSession.builder\
.appName("ExtractCSVfile")\
.getOrCreate()


Extract Data

In [0]:
def check_file_exists(file_path: str) -> bool:
    try:
        # Check if the file exists
        dbutils.fs.ls(file_path)
        return True
    except Exception as e:
        # If there's an exception, the file does not exist
        return False

# csv file path
file_path = "dbfs:/FileStore/tables/spotify_songs.csv"
# "/dbfs/FileStore/tables/spotify_songs.csv"
if check_file_exists(file_path):
    print(f"The file {file_path} exists.")
else:
    print(f"The file {file_path} does not exist.")


The file dbfs:/FileStore/tables/spotify_songs.csv exists.


In [0]:
def extract_csv_file(file_path: str):
  """
   Extract spotify data from a csv file.  

   Parameters: 
    source path(str) to the csv file 
      
  Returns: 
    DataFrame : Spark df(DataFrame) 
  """
  try: 
    # Check if the file exists 
    if not check_file_exists(file_path): 
      raise FileNotFoundError(f"The file at {file_path} does not exist.") 

    # Read CSV file into DataFrame with options 
    print(f"Reading CSV file at {file_path}")
    df = spark.read.csv(file_path, header=True, inferSchema=True) 
    # select a list of columns
    df = df[["track_id","track_artist","track_name","track_album_id","track_album_name","playlist_genre"]]
    return df 
  except FileNotFoundError as fnf_error: 
    print(f"File not found: {fnf_error}") 
  except Exception as e: 
    print(f"Failed to extract the file: {e}")




In [0]:
file_path = "dbfs:/FileStore/tables/spotify_songs.csv"
df = extract_csv_file(file_path)
if df: 
  df.display() 
else: 
  print("No DataFrame returned.")

Reading CSV file at dbfs:/FileStore/tables/spotify_songs.csv


track_id,track_artist,track_name,track_album_id,track_album_name,playlist_genre
6f807x0ima9a1j3VPbc7VN,Ed Sheeran,I Don't Care (with Justin Bieber) - Loud Luxury Remix,2oCs0DGTsRO98Gh5ZSl2Cx,I Don't Care (with Justin Bieber) [Loud Luxury Remix],pop
0r7CVbZTWZgbTCYdfa2P31,Maroon 5,Memories - Dillon Francis Remix,63rPSO264uRjW1X5E6cWv6,Memories (Dillon Francis Remix),pop
1z1Hg7Vb0AhHDiEmnDE79l,Zara Larsson,All the Time - Don Diablo Remix,1HoSmj2eLcsrR0vE9gThr4,All the Time (Don Diablo Remix),pop
75FpbthrwQmzHlBJLuGdC7,The Chainsmokers,Call You Mine - Keanu Silva Remix,1nqYsOef1yKKuGOVchbsk6,Call You Mine - The Remixes,pop
1e8PAfcKUYoKkxPhrHqw4x,Lewis Capaldi,Someone You Loved - Future Humans Remix,7m7vv9wlQ4i0LFuJiE2zsQ,Someone You Loved (Future Humans Remix),pop
7fvUMiyapMsRRxr07cU8Ef,Ed Sheeran,Beautiful People (feat. Khalid) - Jack Wins Remix,2yiy9cd2QktrNvWC2EUi0k,Beautiful People (feat. Khalid) [Jack Wins Remix],pop
2OAylPUDDfwRGfe0lYqlCQ,Katy Perry,Never Really Over - R3HAB Remix,7INHYSeusaFlyrHSNxm8qH,Never Really Over (R3HAB Remix),pop
6b1RNvAcJjQH73eZO4BLAB,Sam Feldt,Post Malone (feat. RANI) - GATTÜSO Remix,6703SRPsLkS4bPtMFFJes1,Post Malone (feat. RANI) [GATTÜSO Remix],pop
7bF6tCO3gFb8INrEDcjNT5,Avicii,Tough Love - Tiësto Remix / Radio Edit,7CvAfGvq4RlIwEbT9o8Iav,Tough Love (Tiësto Remix),pop
1IXGILkPm0tOCNeq00kCPa,Shawn Mendes,If I Can't Have You - Gryffin Remix,4QxzbfSsVryEQwvPFEV5Iu,If I Can't Have You (Gryffin Remix),pop


In [0]:
# print list of columns
df.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- playlist_genre: string (nullable = true)



In [0]:
# get row count
# df.count().show()

In [0]:


# Add a new column 'track_id_length' with the length of each 'track_id' value
# df_with_length = df.withColumn("track_id_length", length(df["track_id"]))

# # Show the resulting DataFrame
# df_with_length.show()



Data Quality Check Function

In [0]:
def perform_data_quality_checks(df):
    """ 
    Perform data quality checks on Dataframe.

    Parameters:
      df(Dataframe) : The spark Dataframe to check.
    
    Returns:
      DataFrame: cleaned and validated Dataframe.
    """
    try:
      # Check for null values in crucial columns
      crucial_columns = df.columns
      for col in crucial_columns:
        null_count = df.filter(df[col].isNull()).count()
        if null_count > 0:
          raise ValueError(f"Column {col} contains {null_count} null values.")
      
      # Remove duplicates based on track_id 
      df = df.dropDuplicates(["track_id","track_artist","track_name"])

      # Check for duplicate records
      duplicate_count = df.count() - df.dropDuplicates().count()
      if duplicate_count > 0:
        raise ValueError(f"Data contains {duplicate_count} duplicate records.")

      # Check if track_id column values are unique
      # if not df["track_id"].is_unique:
      #   raise ValueError("Column track_id contains duplicate values")

      # Check if track_id column values are unique 
      if df.select("track_id").distinct().count() != df.count(): 
        raise ValueError("Column track_id contains duplicate values")

      return df
    except ValueError as e:
      print(f"Data quality check failed {e}")
      raise
  

Transform Data with Quality Checks

In [0]:
import re

# Clean and transform data
# df_cleaned = df.fillna('')
df_cleaned = df.dropna()
df_cleaned = df_cleaned.toDF(*[re.sub(r'[^a-zA-Z0-9_]',' ',col.lower()) for col in df_cleaned.columns])

# Perform data quality checks
df_validated = perform_data_quality_checks(df_cleaned)


In [0]:
display(df_validated)

track_id,track_artist,track_name,track_album_id,track_album_name,playlist_genre
002xjHwzEx66OWFV2IP9dk,RIKA,The Others,1ficfUnZMaY1QkNp15Slzm,The Others,r&b
008MceT31RotUANsKuzy3L,The.madpix.project,Liquid Blue,1Z4ANBVuhTlS6DprlP0m1q,Liquid Blue,pop
00HIh9mVUQQAycsQiciWsh,Magic City Hippies,Limestone,7mtoEwzZYBqG8JYItxcccG,Hippie Castle EP,pop
00NAQYOP4AmWR549nnYJZu,The Weeknd,Secrets,2ODvWsOgouMbaA5xf0RkJe,Starboy,r&b
00UpV14MDfk4CvrMbFvqji,Sevyn Streeter,It Won't Stop (feat. Chris Brown) - Julian Calor Remix,52FeJmVsUJfoQybiwI5j9m,Atlantic Records Miami 2014,edm
00X2yv2vrtritPt2CZnUTZ,PNL,Bené,5GFHFEASZeJF0gyWuDDjGE,Dans la légende,r&b
00chLpzhgVjxs1zKC9UScL,Bell Biv DeVoe,Poison,6oZ6brjB8x3GoeSYdwJdPc,Gold,r&b
00emjlCv9azBN0fzuuyLqy,KARD,Dumb Litty,7h5X3xhh3peIK9Y0qI5hbK,KARD 2nd Digital Single ‘Dumb Litty’,pop
00i0O74dXdaKKdCrqHnfXm,Ricky Martin,La Mordidita,375cUd86z58eqXN2yW3Do9,A Quien Quiera Escuchar (Deluxe Edition),latin
00i2HU7TEzzftShjRrDSEF,2Pac,Changes,4Y9ISbppFbwk0r1XCLUi0I,The Best of 2Pac - Pt. 1: Thug,rap


Load Data

In [0]:

jdbc_hostname = "localhost"
jdbc_port = "5432" # Default PostgreSQL port jdbc_database = "your_database_name"
jdbc_database = "postgres"

# Define the JDBC URL
jdbc_url = f'jdbc:postgresql://{jdbc_hostname}:{jdbc_port}/{jdbc_database}'

# Define connection properties
connection_props = { "user": "postgres", 
                    "password": "postgres", 
                    "driver": "org.postgresql.Driver", 
                    "url": jdbc_url
                  }



df = spark.read.jdbc(url=jdbc_url,table='spotify_playlist_artist',properties=connection_props)

df.show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
File [0;32m<command-3425336956246023>:17[0m
[1;32m      8[0m [38;5;66;03m# Define connection properties[39;00m
[1;32m      9[0m connection_props [38;5;241m=[39m { [38;5;124m"[39m[38;5;124muser[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124mpostgres[39m[38;5;124m"[39m, 
[1;32m     10[0m                     [38;5;124m"[39m[38;5;124mpassword[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124mpostgres[39m[38;5;124m"[39m, 
[1;32m     11[0m                     [38;5;124m"[39m[38;5;124mdriver[39m[38;5;124m"[39m: [38;5;124m"[39m[38;5;124morg.postgresql.Driver[39m[38;5;124m"[39m, 
[1;32m     12[0m                     [38;5;124m"[39m[38;5;124murl[39m[38;5;124m"[39m: jdbc_url
[1;32m     13[0m                   }
[0;32m---> 17[0m df [38;5;241m=[39m spark[38;5;241m.[39mr

In [0]:
dbutils.fs.mkdirs("/databricks/jars")
dbutils.fs.put("/databricks/jars/postgresql-42.2.23.jar", "postgresql/postgresql-42.2.23.jar", True)


