In [2]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [3]:
spark = SparkSession.builder.appName("elt").getOrCreate()

In [18]:
input_data1 = "data/log-data/*.json"
input_data2 = "data/song_data/*/*/*/*.json"

In [19]:
df=spark.read.json(input_data1, multiLine=True)
dfs = spark.read.json(input_data2, multiLine=True)
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [20]:
df.select("page").distinct().toPandas()

Unnamed: 0,page
0,Home
1,Login
2,NextSong


In [21]:
from pyspark.sql.functions import *
dff = df.filter((col("page") =='NextSong') & col("userId").isNotNull()).groupby('userId').agg({'ts':'max'}) 

In [22]:
dff.printSchema()

root
 |-- userId: string (nullable = true)
 |-- max(ts): long (nullable = true)



In [23]:
## user table
dfl=df.join(dff, (df.ts==col("max(ts)")) & (df.userId==dff.userId), 'right').select(df.userId.alias("user_id"), col("firstName").alias("first_name"), 
                                             col("lastName").alias("last_name"), "gender", "level")

In [103]:
## Getting normal columns from table 
dft=df.filter("page='NextSong'")
dft.printSchema()
dft.count()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



20

In [107]:
## Solving Timestamp problem
# get_timestamp = udf()
date_time = df.select(from_unixtime(col("ts")/1000).alias('tsm'))
df_test = date_time.dropDuplicates()
df_test.collect()

[Row(tsm='2018-11-23 11:07:25'),
 Row(tsm='2018-11-20 11:00:42'),
 Row(tsm='2018-11-24 11:45:00'),
 Row(tsm='2018-11-29 11:00:57'),
 Row(tsm='2018-11-05 11:33:12'),
 Row(tsm='2018-11-16 11:00:57'),
 Row(tsm='2018-11-17 11:02:24'),
 Row(tsm='2018-11-12 13:36:57'),
 Row(tsm='2018-11-25 12:39:14'),
 Row(tsm='2018-11-07 11:01:16'),
 Row(tsm='2018-11-26 11:02:43'),
 Row(tsm='2018-11-04 11:15:55'),
 Row(tsm='2018-11-02 12:25:34'),
 Row(tsm='2018-11-11 13:33:56'),
 Row(tsm='2018-11-21 11:27:34'),
 Row(tsm='2018-11-27 11:52:12'),
 Row(tsm='2018-11-19 12:54:28'),
 Row(tsm='2018-11-06 13:12:44'),
 Row(tsm='2018-11-15 11:30:26'),
 Row(tsm='2018-11-02 07:57:10'),
 Row(tsm='2018-11-22 11:03:52'),
 Row(tsm='2018-11-10 11:15:27'),
 Row(tsm='2018-11-28 11:00:15'),
 Row(tsm='2018-11-14 11:03:22'),
 Row(tsm='2018-11-30 11:22:07'),
 Row(tsm='2018-11-09 11:06:17'),
 Row(tsm='2018-11-13 11:36:57'),
 Row(tsm='2018-11-03 12:04:33'),
 Row(tsm='2018-11-08 11:12:30')]

In [100]:
## songplays table
new_df = dft.join(dfs,(dft.artist==dfs.artist_name) & (dft.song==dfs.title), 'left').select(from_unixtime(col("ts")/1000).alias("start_time"), col("userId").alias("user_id"),"level", dfs.song_id, dfs.artist_id, col("sessionId").alias("session_id"), "location",col("userAgent").alias("user_agent"))

In [102]:
new_df.count()

20

In [101]:
from pyspark.sql import Window
window= Window.orderBy("start_time")
dfg = new_df.withColumn('songplay_id', row_number().over(window))
dfg.count()

20

In [28]:
## time table
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
#start_time, hour, day, week, month, year, weekday
time_table = date_time.select(col("tsm").alias("start_time"), hour(col("tsm")).alias("hour"), dayofmonth(col("tsm")).alias("day"), weekofyear(col("tsm")).alias("week"), month(col("tsm")).alias("month"), year(col("tsm")).alias("year"), dayofweek(col("tsm")).alias("weekday"))

In [29]:
time_table.show()

+-------------------+----+---+----+-----+----+-------+
|         start_time|hour|day|week|month|year|weekday|
+-------------------+----+---+----+-----+----+-------+
|2018-11-15 11:30:26|  11| 15|  46|   11|2018|      5|
|2018-11-21 11:27:34|  11| 21|  47|   11|2018|      4|
|2018-11-14 11:03:22|  11| 14|  46|   11|2018|      4|
|2018-11-28 11:00:15|  11| 28|  48|   11|2018|      4|
|2018-11-05 11:33:12|  11|  5|  45|   11|2018|      2|
|2018-11-13 11:36:57|  11| 13|  46|   11|2018|      3|
|2018-11-30 11:22:07|  11| 30|  48|   11|2018|      6|
|2018-11-16 11:00:57|  11| 16|  46|   11|2018|      6|
|2018-11-20 11:00:42|  11| 20|  47|   11|2018|      3|
|2018-11-24 11:45:00|  11| 24|  47|   11|2018|      7|
|2018-11-29 11:00:57|  11| 29|  48|   11|2018|      5|
|2018-11-19 12:54:28|  12| 19|  47|   11|2018|      2|
|2018-11-27 11:52:12|  11| 27|  48|   11|2018|      3|
|2018-11-23 11:07:25|  11| 23|  47|   11|2018|      6|
|2018-11-09 11:06:17|  11|  9|  45|   11|2018|      6|
|2018-11-2

In [30]:
## Artist Table
#artist_id, name, location, lattitude, longitude
artist = dfs.filter("artist_id is NOT NULL").select("artist_id", col("artist_location").alias("location"), col("artist_latitude").alias("latitude"),\
           col("artist_longitude").alias("longitude"))


In [47]:
a1 = dfs.alias("one").filter("artist_id is NOT NULL").groupby("artist_id").agg({'year':'max'}).orderBy("artist_id")
a1.count()


69

In [48]:
## logic to get artist' latest details by year, 
## when artist has two updates per year, records are taken as sort order whichever appear first

a2 = dfs.alias("one").filter("artist_id is NOT NULL").groupby("artist_id").agg({'year':'max'})\
.join(dfs.alias("two"), (col("one.artist_id")==col("two.artist_id")) & ("max(year)"==col("two.year")), 'left')\
.select("one.artist_id", col("artist_location").alias("location"), col("artist_latitude").alias("latitude"),\
           col("artist_longitude").alias("longitude"))

In [49]:
## let us optimize query if possible
a2.count()

69

In [333]:
## Song Table
dfs.count()

71

In [50]:
# song_id, title, artist_id, year, duration
song = dfs.filter("song_id is NOT NULL")\
.select("song_id","title", "artist_id", "year", "duration")

In [54]:
song.select("song_id", "artist_id").dropDuplicates().count()

71

In [93]:
song.write.parquet("Data/song", mode="overwrite", partitionBy=('year', 'artist_id'), compression='snappy')

In [92]:
help(df.write.parquet)

Help on method parquet in module pyspark.sql.readwriter:

parquet(path, mode=None, partitionBy=None, compression=None) method of pyspark.sql.readwriter.DataFrameWriter instance
    Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
    
    :param path: the path in any Hadoop supported file system
    :param mode: specifies the behavior of the save operation when data already exists.
    
        * ``append``: Append contents of this :class:`DataFrame` to existing data.
        * ``overwrite``: Overwrite existing data.
        * ``ignore``: Silently ignore this operation if data already exists.
        * ``error`` or ``errorifexists`` (default case): Throw an exception if data already                 exists.
    :param partitionBy: names of partitioning columns
    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, uncompressed, snappy, gzip,
     