# Exploring various pyspark methods in python & Jupyter notebook

In [1]:
import configparser
from datetime import datetime
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc, col, when, udf
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, from_unixtime, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Dat, TimestampType, LongType

In [2]:
# config = configparser.ConfigParser()
# config.read('conff.cfg')

# os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
# os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [2]:
# Create or retrieve a Spark Session
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .getOrCreate()

In [3]:
spark

In [4]:
songSchema = R([
        Fld("artist_id",Str()),
        Fld("artist_latitude",Dbl()),
        Fld("artist_location",Str()),
        Fld("artist_longitude",Dbl()),
        Fld("artist_name",Str()),
        Fld("duration",Dbl()),
        Fld("num_songs",Int()),
        Fld("song_id",Str()),
        Fld("title",Str()),
        Fld("year",Int()),
    ])

In [5]:
input_data = spark.read.json("song_data/*/*/*/*.json", schema = songSchema)

In [6]:
input_data

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: int, song_id: string, title: string, year: int]

In [10]:
copy_df = input_data

In [50]:
song_df = copy_df.toPandas()

song_df.sample(5)

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
19,ARBEBBY1187B9B43DB,,"Gainesville, FL",,Tom Petty,236.17261,1,SOFFKZS12AB017F194,A Higher Place (Album Version),1994
50,ARD0S291187B9B7BF5,,Ohio,,Rated R,114.78159,1,SOMJBYD12A6D4F8557,Keepin It Real (Skit),0
17,ARMBR4Y1187B9990EB,37.77916,California - SF,-122.42005,David Martin,241.47546,1,SOTTDKS12AB018D69B,It Wont Be Christmas,0
48,ARGIWFO1187B9B55B7,,,,Five Bolt Main,225.09669,1,SOPSWQW12A6D4F8781,Made Like This (Live),0
54,ARJIE2Y1187B994AB7,,,,Line Renaud,152.92036,1,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,0


In [51]:
song_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71 entries, 0 to 70
Data columns (total 10 columns):
artist_id           71 non-null object
artist_latitude     31 non-null float64
artist_location     71 non-null object
artist_longitude    31 non-null float64
artist_name         71 non-null object
duration            71 non-null float64
num_songs           71 non-null int32
song_id             71 non-null object
title               71 non-null object
year                71 non-null int32
dtypes: float64(3), int32(2), object(5)
memory usage: 5.1+ KB


In [52]:
copy_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [11]:
def replace(x):
    return when(col(x) != 0, col(x)).otherwise(None)

copy_df = copy_df.withColumn("year", replace("year"))

In [12]:
def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

copy_df = copy_df.withColumn("artist_location", blank_as_null("artist_location"))

In [13]:
copy_df

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: int, song_id: string, title: string, year: int]

In [14]:
#songs_table -- ["title", "artist_id","year", "duration"]

songs_table = copy_df.select('song_id', 'artist_id', 'year', 'duration', 'title')

In [15]:
songs_table.show(5)

+------------------+------------------+----+---------+--------------------+
|           song_id|         artist_id|year| duration|               title|
+------------------+------------------+----+---------+--------------------+
|SOBAYLL12A8C138AF9|ARDR4AC1187FB371A1|null|511.16363|Sono andati? Fing...|
|SOOLYAZ12A6701F4A6|AREBBGV1187FB523D2|null|173.66159|Laws Patrolling (...|
|SOBBUGU12A8C13E95D|ARMAC4T1187FB3FA4C|2004|207.77751|Setting Fire to S...|
|SOAOIBZ12AB01815BE|ARPBNLO1187FB3D52F|2000| 43.36281|I Hold Your Hand ...|
|SONYPOM12A8C13B2D7|ARDNS031187B9924F0|2005|186.48771|I Think My Wife I...|
+------------------+------------------+----+---------+--------------------+
only showing top 5 rows



In [16]:
artist_spec = Window.partitionBy("artist_id").orderBy(desc("year"))

In [18]:
#artist-table -- artist, "artist_name, "artist_location, "artist_latitude, artist_longitude

artist_partition = copy_df.selectExpr('artist_id', 'artist_name as name', 'artist_location as location', 'artist_longitude as longitude', 'artist_latitude as latitude', 'year').withColumn("row_number", row_number().over(artist_spec))\
.where(col('row_number') == 1)

In [22]:
artist_table = artist_partition.selectExpr('artist_id as ID', 'name', 'location', 'longitude', 'latitude')

In [23]:
artist_table.show(5)

+------------------+--------------------+--------------------+---------+--------+
|                ID|                name|            location|longitude|latitude|
+------------------+--------------------+--------------------+---------+--------+
|AR9AWNF1187B9AB0B4|Kenny G featuring...|Seattle, Washingt...|     null|    null|
|AR0IAWL1187B9A96D0|        Danilo Perez|              Panama|-80.11278|  8.4177|
|AR0RCMP1187FB3F427|    Billie Jo Spears|        Beaumont, TX|-94.10158|30.08615|
|AREDL271187FB40F44|        Soul Mekanik|                null|     null|    null|
|ARI3BMM1187FB4255E|        Alice Stuart|          Washington|  -77.029| 38.8991|
+------------------+--------------------+--------------------+---------+--------+
only showing top 5 rows



In [24]:
log_data = spark.read.json("log-data/*.json")

In [28]:
log_data = log_data.withColumn("songplay_id", monotonically_increasing_id())

In [29]:
log_data

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, songplay_id: bigint]

In [66]:
log_data.count()

8056

In [67]:
log_data.select('songplay_id', 'ts').show(5)

+-----------+-------------+
|songplay_id|           ts|
+-----------+-------------+
|          0|1542241826796|
|          1|1542242481796|
|          2|1542242741796|
|          3|1542247071796|
|          4|1542252577796|
+-----------+-------------+
only showing top 5 rows



In [68]:
spark.sql("select timestamp(from_unixtime(1542252577796/1000,'yyyy-MM-dd HH:mm:ss.SS')) as ts").show()

+-------------------+
|                 ts|
+-------------------+
|2018-11-14 22:29:37|
+-------------------+



In [38]:
log_data = log_data.withColumn("start_time", from_unixtime(col("ts")/1000, 'yyyy-MM-dd HH:mm:ss.SS').cast("timestamp"))

In [39]:
log_data.select('start_time').show(5)

+-------------------+
|         start_time|
+-------------------+
|2018-11-14 19:30:26|
|2018-11-14 19:41:21|
|2018-11-14 19:45:41|
|2018-11-14 20:57:51|
|2018-11-14 22:29:37|
+-------------------+
only showing top 5 rows



In [40]:
log_copy = log_data

In [41]:
log_copy1 = log_copy.filter(col('page') == 'NextSong')

In [42]:
time_data = log_copy1.selectExpr('start_time').dropDuplicates().withColumn('week', weekofyear(col('start_time'))).withColumn('year', year(col('start_time')))\
.withColumn('month', month(col('start_time'))).withColumn('day', dayofmonth(col('start_time'))).withColumn('Day_of_Week', dayofweek(col('start_time')))\
.withColumn('Hour', hour(col('start_time')))

In [43]:
time_data.show(5)

+-------------------+----+----+-----+---+-----------+----+
|         start_time|week|year|month|day|Day_of_Week|Hour|
+-------------------+----+----+-----+---+-----------+----+
|2018-11-21 05:52:12|  47|2018|   11| 21|          4|   5|
|2018-11-21 14:46:29|  47|2018|   11| 21|          4|  14|
|2018-11-13 23:37:40|  46|2018|   11| 13|          3|  23|
|2018-11-14 07:14:41|  46|2018|   11| 14|          4|   7|
|2018-11-14 11:19:02|  46|2018|   11| 14|          4|  11|
+-------------------+----+----+-----+---+-----------+----+
only showing top 5 rows



In [35]:
user_spec = Window.partitionBy("userId").orderBy(desc("ts"))

In [36]:
user_partition = log_copy1.selectExpr('userId', 'firstName', 'lastName', 'gender', 'level', 'ts').withColumn("row_number", row_number().over(user_spec))\
.where(col('row_number') == 1)

In [37]:
user_table = user_partition.selectExpr('userId', 'firstName', 'lastName', 'gender', 'level').show(5)

+------+---------+--------+------+-----+
|userId|firstName|lastName|gender|level|
+------+---------+--------+------+-----+
|    51|     Maia|   Burke|     F| free|
|     7|   Adelyn|  Jordan|     F| free|
|    15|     Lily|    Koch|     F| paid|
|    54|    Kaleb|    Cook|     M| free|
|   101|   Jayden|     Fox|     M| free|
+------+---------+--------+------+-----+
only showing top 5 rows



In [78]:
user_data.show(10)

+------+----------+--------+------+-----+-------------+----------+
|userId| firstName|lastName|gender|level|           ts|row_number|
+------+----------+--------+------+-----+-------------+----------+
|    51|      Maia|   Burke|     F| free|1542673957796|         1|
|     7|    Adelyn|  Jordan|     F| free|1542592496796|         1|
|    15|      Lily|    Koch|     F| paid|1543234288796|         1|
|    54|     Kaleb|    Cook|     M| free|1543459210796|         1|
|   101|    Jayden|     Fox|     M| free|1543589497796|         1|
|    11| Christian|  Porter|     F| free|1543103174796|         1|
|    29|Jacqueline|   Lynch|     F| paid|1543423613796|         1|
|    69|  Anabelle| Simpson|     F| free|1543415274796|         1|
|    42|    Harper| Barrett|     M| paid|1542714436796|         1|
|    73|     Jacob|   Klein|     M| paid|1543544359796|         1|
+------+----------+--------+------+-----+-------------+----------+
only showing top 10 rows



In [44]:
#JOIN -- #song == title; artist == artist_name; duration == length; start_time == start_time 

copy_df.createOrReplaceTempView("songs_log")
log_copy1.createOrReplaceTempView("evts_log")
time_data.createOrReplaceTempView("time_log")

songplays_table = spark.sql("""
    select e.songplay_id, 
    e.start_time, e.userId, e.level, 
    s.song_id, s.artist_id, e.sessionId, 
    e.location, e.userAgent, t.year, t.month 
    from songs_log s INNER JOIN evts_log e ON s.title = e.song 
    AND s.artist_name = e.artist AND s.duration = e.length 
    INNER JOIN time_log t ON t.start_time = e.start_time""")

In [46]:
songplays_table.count()

1

In [48]:
time_data.write.partitionBy('year', 'month').parquet("C:/Users/Adeboye Adeniyi/Documents/Data/time_da.parquet")

In [88]:
#song == title; artist == artist_name; duration == length; start_time == start_time

song_log = songs_table.join(log_copy1, songs_table.title == log_copy1.song).collect()
#artist_log = artist_table.join(song_log, artist_table.name == song_log.artist).collect()

In [83]:
artist_log.count()

TypeError: count() takes exactly one argument (0 given)