In [None]:
from datetime import datetime
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import udf, col, asc, desc, countDistinct
from pyspark.sql.functions import date_format, row_number, monotonically_increasing_id 
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType, TimestampType


In [None]:
import zipfile as zf

log_files = zf.ZipFile('data/log-data.zip', 'r')
log_files.extractall('data/log_data')
log_files.close()

song_files = zf.ZipFile('data/song-data.zip', 'r')
song_files.extractall('data')
song_files.close()

In [None]:
spark = SparkSession \
    .builder \
    .appName('DataLake') \
    .getOrCreate()

In [None]:
logDf = spark.read.json('data/log_data/*')
songDf = spark.read.json('data/song_data/*/*/*/*')

# logDf = spark \
#     .read \
#     .format('json') \
#     .option('inferSchema', 'true') \
#     .load('data/log_data/*')

In [None]:
songDf.show(1)
songDf.count()
songDf.printSchema()

In [None]:
songs_schema = StructType([
    StructField('song_id', StringType(), nullable=False),
    StructField('title', StringType(), nullable=False),
    StructField('artist_id', StringType(), nullable=True),
    StructField('year', LongType(), nullable=True),
    StructField('duration', DoubleType(), nullable=True)
])

In [None]:
songs_rdd = songDf \
    .filter(col('song_id').isNotNull()) \
    .filter(col('title').isNotNull()) \
    .select('song_id', 'title', 'artist_id', 'year', 'duration') \
    .rdd

songs = spark.createDataFrame(songs_rdd, songs_schema)

songs.show(2)

In [None]:
songs \
    .repartition('year', 'artist_id') \
    .write \
    .mode('overwrite') \
    .parquet('tables/songs/songs.parquet')

In [None]:
songs_read = spark.read.parquet('tables/songs/songs.parquet')
songs_read.createOrReplaceTempView('songs')
spark.sql('SELECT * FROM songs WHERE title LIKE "Law%"').show()
spark.sql('SELECT COUNT(*) FROM songs').show()

In [None]:
artists_schema = StructType([
    StructField('artist_id', StringType(), nullable=False),
    StructField('name', StringType(), nullable=False),
    StructField('location', StringType(), nullable=True),
    StructField('latitude', DoubleType(), nullable=True),
    StructField('longitude', DoubleType(), nullable=True)
])

In [None]:
artists_rdd = songDf \
    .filter(col('artist_id').isNotNull()) \
    .filter(col('artist_name').isNotNull()) \
    .select('artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude') \
    .rdd

artists = spark.createDataFrame(artists_rdd, artists_schema)

artists.show(2)

In [None]:
artists.write.parquet('tables/artists/artists.parquet')

In [None]:
logDf.show(1)
logDf.count()
logDf.printSchema()

In [None]:
logDf.count()
logDf.groupBy('userId').count().show()

In [None]:
d = [
    {'name': 'Alice', 'age': 1},
    {'name': 'Alice', 'age': 2},
    {'name': 'Bob', 'age': 5}
]

df = spark.createDataFrame(d)
df.show()

w = Window.partitionBy('name').orderBy(col('age').desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn('rn', row_number().over(w)).show()


In [None]:
users_schema = StructType([
    StructField('user_id', LongType(), nullable=False),
    StructField('first_name', StringType(), nullable=True),
    StructField('last_name', StringType(), nullable=True),
    StructField('gender', StringType(), nullable=True),
    StructField('level', StringType(), nullable=True)
])

In [None]:
users_window = Window \
    .partitionBy('userId') \
    .orderBy(col('ts').desc()) \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

users_rdd = logDf \
    .filter(col('page') == 'NextSong') \
    .filter(col('userId').isNotNull()) \
    .withColumn('num', row_number().over(users_window)) \
    .withColumn('user_id', col('userId').cast(LongType())) \
    .filter(col('num') == 1) \
    .select('user_id', 'firstName', 'lastName', 'gender', 'level') \
    .rdd
    
users = spark.createDataFrame(users_rdd, users_schema)

users.show(2)

In [None]:
users.write.parquet('tables/users/users.parquet')

In [None]:
time_schema = StructType([
    StructField('start_time', TimestampType(), nullable=False),
    StructField('hour', IntegerType(), nullable=False),
    StructField('day', IntegerType(), nullable=False),
    StructField('week', IntegerType(), nullable=False),
    StructField('month', IntegerType(), nullable=False),
    StructField('year', IntegerType(), nullable=False),
    StructField('weekday', IntegerType(), nullable=False)
])

In [None]:
logDf.select('ts').show(10)

In [None]:
time_rdd = logDf \
    .select('ts') \
    .distinct() \
    .withColumn('timestamp', (col('ts') / 1000).cast(TimestampType())) \
    .select(
        col('timestamp').alias('start_time'),
        hour('timestamp').alias('hour'),
        dayofmonth('timestamp').alias('day'),
        weekofyear('timestamp').alias('week'),
        month('timestamp').alias('month'),
        year('timestamp').alias('year'),
        date_format(col('timestamp'), 'F').cast(IntegerType()).alias('weekday')
    ) \
    .rdd

time = spark.createDataFrame(time_rdd, time_schema)

time.show(2)

In [None]:
time \
    .repartition('year', 'month') \
    .write \
    .mode('overwrite') \
    .parquet('tables/time/time.parquet')

In [None]:
songplays_schema = StructType([
    StructField('songplay_id', LongType(), nullable=False),
    StructField('start_time', TimestampType(), nullable=False),
    StructField('user_id', LongType(), nullable=False),
    StructField('level', StringType(), nullable=True),
    StructField('song_id', StringType(), nullable=False),
    StructField('artist_id', StringType(), nullable=False),
    StructField('session_id', LongType(), nullable=True),
    StructField('location', StringType(), nullable=True),
    StructField('user_agent', StringType(), nullable=True)
])

In [None]:
clean_logDf = logDf \
    .filter(col('page') == 'NextSong')

clean_songDf = songDf \
    .filter(col('song_id').isNotNull()) \
    .filter(col('artist_id').isNotNull())

songplays_rdd = clean_songDf \
    .join(clean_logDf,
        (clean_songDf.title == clean_logDf.song)
            & (clean_songDf.artist_name == clean_logDf.artist)
            & (clean_songDf.duration == clean_logDf.length)
        , 'inner') \
    .withColumn('id', monotonically_increasing_id() + 1) \
    .withColumn('start_time', (col('ts') / 1000).cast(TimestampType())) \
    .withColumn('user_id', col('userId').cast(LongType())) \
    .select('id', 'start_time', 'user_id', 'level', 'song_id', 'artist_id', 'sessionId', 'location', 'userAgent') \
    .rdd


songplays = spark.createDataFrame(songplays_rdd, songplays_schema)

songplays.show(2)

In [None]:
songplays \
    .repartition(year('start_time'), month('start_time')) \
    .write \
    .mode('overwrite') \
    .parquet('tables/songplays/songplays.parquet')