In [1]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
spark = SparkSession \
    .builder \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
    .getOrCreate()

In [7]:
input_data = "./input_data"
song_data = os.path.join(input_data, "song_data/*/*/*/*.json")
df = spark.read.json(song_data)
df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [8]:
songs_df = df['song_id','title','artist_id','year','duration']

In [13]:
output_data = "./output_data"
songs_df.write \
.partitionBy(['year','artist_id']) \
.format('parquet') \
.mode('overwrite') \
.save(os.path.join(output_data,"songs"))

In [16]:
artists_df = df.selectExpr('artist_id' \
                           ,'artist_name as name' \
                           ,'artist_location as location' \
                           ,'artist_latitude as latitude' \
                           ,'artist_longitude as longitude')
artists_df.head(2)

[Row(artist_id='ARDR4AC1187FB371A1', name='Montserrat Caballé;Placido Domingo;Vicente Sardinero;Judith Blegen;Sherrill Milnes;Georg Solti', location='', latitude=None, longitude=None),
 Row(artist_id='AREBBGV1187FB523D2', name="Mike Jones (Featuring CJ_ Mello & Lil' Bran)", location='Houston, TX', latitude=None, longitude=None)]

In [17]:
artists_df.show()

+------------------+--------------------+--------------------+--------+----------+
|         artist_id|                name|            location|latitude| longitude|
+------------------+--------------------+--------------------+--------+----------+
|ARDR4AC1187FB371A1|Montserrat Caball...|                    |    null|      null|
|AREBBGV1187FB523D2|Mike Jones (Featu...|         Houston, TX|    null|      null|
|ARMAC4T1187FB3FA4C|The Dillinger Esc...|   Morris Plains, NJ|40.82624| -74.47995|
|ARPBNLO1187FB3D52F|            Tiny Tim|        New York, NY|40.71455| -74.00712|
|ARDNS031187B9924F0|          Tim Wilson|             Georgia|32.67828| -83.22295|
|ARNF6401187FB57032|   Sophie B. Hawkins|New York, NY [Man...|40.79086| -73.96644|
|ARLTWXK1187FB5A3F8|         King Curtis|      Fort Worth, TX|32.74863| -97.32925|
|ARPFHN61187FB575F6|         Lupe Fiasco|         Chicago, IL|41.88415| -87.63241|
|ARI2JSK1187FB496EF|Nick Ingman;Gavyn...|     London, England|51.50632|  -0.12714|
|ARO

In [18]:
artists_df.write \
.format('parquet') \
.mode('overwrite') \
.save(os.path.join(output_data,'artists'))

In [20]:
input_data = "./input_data"
log_data = os.path.join(input_data, "log_data/*/*/*.json")
log_df = spark.read.json(log_data)
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [30]:
import pyspark.sql.functions as F
log_df_with_date = log_df.withColumn('ts_timestamp', F.to_timestamp(log_df['ts'] / 1000)) \
                .withColumn('date', date_format('ts_timestamp','yyyy.MM.dd HH:mm:ss.SSS'))

In [32]:
from pyspark.sql.window import Window
users_df = log_df_with_date.selectExpr('userId as user_id' \
                           ,'firstName as first_name' \
                           ,'lastName as last_name' \
                           ,'gender as gender' \
                           ,'level as level' \
                           ,'date') \
                    .withColumn('rownum', F.rowNumber().over(Window.partitionBy('user_id').orderBy(desc('date'))))
users_df.show(2, truncate=False)

ParseException: "\nmismatched input '.' expecting ')'(line 1, pos 25)\n\n== SQL ==\nF.rowNumber().over(Window.partitionBy('user_id').orderBy(desc('date'))) as rownum\n-------------------------^^^\n"