# ETL dev
Local execution of spark.

In [None]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour
from pyspark.sql.functions import  weekofyear, date_format,dayofweek,countDistinct
from pyspark.sql.functions import  monotonically_increasing_id


config = configparser.ConfigParser()
#config.read('dl.cfg')
config.read('/home/gari/.aws/credentials')
os.environ['AWS_ACCESS_KEY_ID']=config.get('credentials','KEY')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('credentials','SECRET')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('credentials','SECRET')

In [2]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [3]:
spark = create_spark_session()

In [4]:
from pyspark.sql.types import StructType, StructField , DoubleType, StringType, IntegerType, DateType
#songSchema = StructType(exprs)

# def process_song_data(spark, input_data, output_data)

In [5]:
df = spark.read.json("/home/gari/Desktop/myGItRepos/project_spark/data/song_data/*/*/*/*.json")

In [6]:
input_data='/home/gari/Desktop/myGItRepos/project_spark/data'
output_data='./parquet_area/'
song_data = input_data+"/song_data/*/*/*/*.json"

# read song data file
df = spark.read.json(song_data)

# extract columns to create songs table
songs_table = df.select("song_id",
                        "title",
                        "artist_id",
                        "year",
                        "duration").drop_duplicates(subset=['song_id'])

# write songs table to parquet files partitioned by year and artist
songs_table.write.mode("overwrite").partitionBy("year","artist_id").parquet(output_data+"songs")

# extract columns to create artists table
artists_table = df.selectExpr("artist_id",
                            "artist_name as name",
                            "artist_location as location",
                            "artist_latitude as latitude","artist_longitude as longitude")

artists_table = artists_table.drop_duplicates(subset=['artist_id'])


# write artists table to parquet files
artists_table.write.mode("overwrite").parquet(output_data+"artists")

# def process_log_data(spark, input_data, output_data):

In [7]:
log_data = input_data+"/log-data/*.json"

In [8]:

# read log data file
df =  spark.read.json(log_data)

# filter by actions for song plays
df = df.filter(df.page == 'NextSong')

# extract columns for users table    
users_table =df.selectExpr("userId as user_id",
                            "firstName as first_name",
                            "lastName as last_name",
                            "gender","level").drop_duplicates(subset=['user_id'])

# write users table to parquet files
users_table.write.mode("overwrite").parquet(output_data+"users")
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: int(x),IntegerType())
df = df.withColumn("start_time", get_timestamp(col("ts")/1000.0)).drop_duplicates(subset=['ts'])
# get timestamp on date time
# df = df.withColumn("timestamp", to_timestamp(col("ts")/1000.0))

# create datetime column from original timestamp column
get_datetime = udf(lambda x: datetime.fromtimestamp(x/1000),DateType())
df =  df.withColumn("datetime", get_datetime(col("ts")))

# extract columns to create time table
time_table = df.select("start_time","datetime")

dict_time_func={"hour":hour,
            "day":dayofmonth,
            "week":weekofyear,
            "month":month,
            "year":year ,
            "weekday":dayofweek}


for key,value in dict_time_func.items():
    time_table = time_table.withColumn(key,value("datetime"))
    
time_table.write.mode("overwrite").partitionBy("year","month").parquet(output_data+"time")

In [9]:
song_table_read    = spark.read.parquet(output_data+"songs")["song_id","title","artist_id","duration"]
artists_table_read = spark.read.parquet(output_data+"artists")["artist_id","name"]
df_song_comp = song_table_read.join(artists_table_read, on=['artist_id'], how='left')

In [10]:
song_table_read.show(2)

+------------------+--------------------+------------------+---------+
|           song_id|               title|         artist_id| duration|
+------------------+--------------------+------------------+---------+
|SOAOIBZ12AB01815BE|I Hold Your Hand ...|ARPBNLO1187FB3D52F| 43.36281|
|SONYPOM12A8C13B2D7|I Think My Wife I...|ARDNS031187B9924F0|186.48771|
+------------------+--------------------+------------------+---------+
only showing top 2 rows



In [11]:

# extract columns from joined song and log datasets to create songplays table 
songplays_aux_table = df.selectExpr("song as title","artist as name","length as duration",
                                "start_time",
                                "userId as user_id",
                                "level",
                                "sessionId as session_id",
                                "location",
                                "userAgent as user_agent").drop_duplicates(subset=['start_time'])

songplays_aux_table = songplays_aux_table.withColumn("songplay_id", monotonically_increasing_id())

In [12]:
songplays_table = songplays_aux_table.join(df_song_comp, on=['title','name','duration'], how='left')
songplays_table = songplays_table.selectExpr('songplay_id',
                                             'start_time',
                                             'user_id',
                                             'level',
                                             'song_id',
                                             'artist_id',
                                             'session_id',
                                             'location',
                                             'user_agent')


In [14]:
songplays_table.write.mode("overwrite").parquet(output_data+"songplays")

In [13]:
songplays_table.where(col("song_id").isNotNull()).show(20)

+------------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
| songplay_id|start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|
+------------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+
|627065225236|1542837407|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|
+------------+----------+-------+-----+------------------+------------------+----------+--------------------+--------------------+



In [98]:
songplays_aux_table.select('title','name','duration').show(3)

+--------------------+--------------+---------+
|               title|          name| duration|
+--------------------+--------------+---------+
|Make Love To Your...|  Bill Withers|383.73832|
|All Hands Against...|The Black Keys|196.91057|
| I'm Still Breathing|    Katy Perry|228.49261|
+--------------------+--------------+---------+
only showing top 3 rows



In [99]:
df_song_comp.select('title','name','duration').show(3)

+--------------------+-----------+---------+
|               title|       name| duration|
+--------------------+-----------+---------+
|I Hold Your Hand ...|   Tiny Tim| 43.36281|
|I Think My Wife I...| Tim Wilson|186.48771|
|A Whiter Shade Of...|King Curtis|326.00771|
+--------------------+-----------+---------+
only showing top 3 rows

