In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StringType
import os
from dotenv import load_dotenv

# Load environment variables from root .env file
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.dirname('discogs.csv'))), '.env'))

# Set Hadoop home directory
os.environ['HADOOP_HOME'] = 'C:\\hadoop'
os.environ['PATH'] = os.environ['PATH'] + ';C:\\hadoop\\bin'

# Initialize Spark session with current configuration
spark = SparkSession.builder \
    .appName("CSV Cleaner") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem") \
    .getOrCreate()

# Get the local path from environment variable
local_path = os.getenv('LOCAL_PATH')
if not local_path:
    raise ValueError("LOCAL_PATH environment variable not found. Please check your .env file.")

# Construct the full path to the CSV file
csv_path = os.path.join(local_path, 'discogs.csv')

# Read the CSV file
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_path) \
    .limit(100)  # For testing, remove this in production


In [2]:

# Drop unnecessary columns
df = df.drop('status', 'notes', 'label_id', 'format', 'style', 
             'master_id', 'company_name', 'release_id', 'artist_id', 'video_url')
df = df.withColumn('popularity', lit(None).cast(StringType())) \
       .withColumn('spotify_url', lit(None).cast(StringType())) \
       .withColumn('image_url', lit(None).cast(StringType()))

# Show the cleaned data
print("Cleaned Schema:")
df.printSchema()

print("\nFirst 5 rows of cleaned data:")
df.show(5, truncate=False)

print("\nTotal rows:", df.count())

NameError: name 'df' is not defined

In [11]:
import pandas as pd
# Convert to Pandas DataFrame for easier manipulation
pandas_df = df.toPandas()
# Save to CSV using Pandas
pandas_df.to_csv( 'D:/flutter_projects/musicClusterer/ml_model/data/cleaned_discogs_sample.csv', index=False)
pandas_df
# # Write to CSV using absolute path
# output_path = os.path.join(local_path, 'cleaned_discogs.csv')
# df.write \
#     .mode("overwrite") \
#     .option("header", "true") \
#     .csv(output_path)

Unnamed: 0,id,release_id,status,title,artist_id,artist_name,label_name,label_id,format,genre,style,country,release_date,notes,master_id,video_url,company_name
0,1,12295801,Accepted,The World Of Ray Price,311678,Ray Price,Columbia,1866,8-Track Cartridge,"Folk, World, & Country",,US,1970,,,,
1,2,12295802,Accepted,The Burden of Isolation,3720243,Filth (9),Not on Label (Filth (9) Self-Released),1495843,CD,Rock,Deathcore,US,2018,,1397110,https://www.youtube.com/watch?v=mRN-e_mIfQQ,
2,3,12295803,Accepted,Bassoon Concertos,6095671,Sebastian Fagerlund,BIS,51038,SACD,Classical,Contemporary,Sweden,2016,Made in the EU,,,BIS Records AB
3,4,12295805,Accepted,Ich Lag In Einer Nacht Und Schlief,3170804,Hans Peter Treichler,Gold Records,11489,Vinyl,Pop,,Switzerland,1980,,,,Gold Records
4,5,12295806,Accepted,Bien O Mal,85181,Julieta Venegas,Sony Music,25487,CD,Latin,,Argentina,2010,,1418114,https://www.youtube.com/watch?v=KTr9HMnAWNE,Lolein Music
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,50,12295857,Accepted,Santo De Pau Feito,4263371,Leonel Nunes,Discotoni,598183,Cassette,"Folk, World, & Country",,Portugal,2006,,,,
96,51,12295858,Accepted,Wandelen,2163745,De Tuinen,Beer On The Rug,242586,File,Electronic,Abstract,US,2016,‘Wandelen’ was recorded in a room.,,,
97,April,"2016""",,,,,,,,,,,,,,,
98,52,12295859,Accepted,Εδώ Είμ' Εγώ,1945520,Γιώργος Μουφλουζέλης,Lyra,73677,CD,"Folk, World, & Country",Éntekhno,Greece,1998,"""""""Καινούργια Τραγούδια""""",,,


In [None]:
# Stop Spark session
spark.stop() 
