In [0]:
import json
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
from pyspark.sql.functions import substring
import time

In [0]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Exemple de lecture CSV") \
    .getOrCreate()

# Define schema for user streaming history DataFrame
user_streaming_history_schema = StructType([
    StructField("StreamingId", IntegerType(), True),
    StructField("TrackId", StringType(), True),
    StructField("Timestamp", StringType(), True),
    StructField("Username", StringType(), True),
    StructField("Platform", StringType(), True),
    StructField("Timeplayed", IntegerType(), True),
    StructField("Conncountry", StringType(), True)
])

# Define the associative schema for user streaming history and track DataFrame
streaming_track_schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("StreamingId", StringType(), True),
    StructField("TrackId", StringType(), True)
])

# Create DataFrame for each defined schema
df_streaming = spark.createDataFrame([], schema=user_streaming_history_schema)
df_streaming_track = spark.createDataFrame([], schema=streaming_track_schema)

# Read CSV file
file_path = '/mnt/raw/user_data/test_user_data.csv'
df_csv = spark.read.csv(file_path, header=True)

# Get current timestamp
start_time = time.time()

# The streaming_id
streaming_count = 0

track_data = set()

# Need to create the associative table
streaming_track_dicts = {}

# Iterate over each row in the CSV DataFrame
for row in df_csv.rdd.collect():
    # Increment the streaming Id
    streaming_count += 1
    streamingId = int(streaming_count)
    trackId = str(row['spotify_track_uri'])
    timestamp = str(row['ts'])
    username = str(row['username'])
    platform = str(row['platform'])
    timeplayed = int(row['ms_played'])
    conncountry = str(row['conn_country'])

    
    # Create a Spark DataFrame Row
    streaming_row = Row(StreamingId=streamingId, TrackId=trackId, Timestamp=timestamp, 
                        Username=username, Platform=platform, Timeplayed=timeplayed, 
                        Conncountry=conncountry)
    
    track_data.add(trackId)
    streaming_track_dicts[streaming_count] = trackId

    # Append the Row to the user streaming DataFrame
    df_streaming = df_streaming.union(spark.createDataFrame([streaming_row], schema=user_streaming_history_schema))

#print(streaming_track_dicts)


streaming_track_id = 0

# Iterate over each track from the set to create the associative dataFrame
for trackId in track_data:

    # For each track I iterate over the streaming_track_dicts dictonnary 
    for stream in streaming_track_dicts:
        if (trackId == streaming_track_dicts[stream]):
            # Increment the streaming_track Id
            streaming_track_id += 1

            # Create a Spark DataFrame Row
            streaming_track_row = Row(ID=streaming_track_id,StreamingId=stream,TrackId=trackId)

            # Append the Row to the user streaming history and track DataFrame
            df_streaming_track = df_streaming_track.union(spark.createDataFrame([streaming_track_row], schema=streaming_track_schema))

# Print elapsed time
print("Elapsed time:", time.time() - start_time)

# df
df_streaming = df_streaming.withColumn('TrackId', substring('TrackId',15,23))
df_streaming_track = df_streaming_track.withColumn('TrackId', substring('TrackId',15,23))


In [0]:
# Show DataFrame schema and data
df_streaming.printSchema()
df_streaming.show()

In [0]:
# Show DataFrame schema and data
df_streaming_track.printSchema()
df_streaming_track.show()

In [0]:
# Write the transformed file ADL Processed Folder
df_streaming.write.mode("overwrite").csv("/mnt/processed/processed-user-data")
df_streaming_track.write.mode("overwrite").csv("/mnt/processed/processed-user-track")