In [1]:
!pip install spark-nlp==1.7.3



In [2]:
!java -version
!python --version

openjdk version "1.8.0_242"
OpenJDK Runtime Environment (build 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
Python 3.7.6


In [3]:
from pyspark.sql.types import TimestampType, StructType, StructField, FloatType, IntegerType, LongType, StringType, DataType
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear
from pyspark.sql.functions import monotonically_increasing_id
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf,col
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from datetime import datetime
import configparser
import pandas as pd
import os

# MY FUNCTIONS

In [4]:
def check_parquet(parquet_path):
    ! ls 2>&1 -lh $parquet_path | head -10
    ! echo 'Parquet Files:' $(ls | wc -l)
    table_parquet = spark.read.parquet(parquet_path)
    print('DataFrame rows: %d' % table_parquet.count())
    print('DataFrame schema: %s' % table_parquet)
    table_parquet.show(10, False)
    return table_parquet

In [5]:
def clean_timestamp(df):
    # convert timestamps to date time from epoch time so we can get hour of the day
    get_timestamp = F.udf(lambda x: datetime.fromtimestamp(x/1000), T.TimestampType())
    # add a new column `formated_ts` in our dataframe
    df_log_copy = df.withColumn("formated_ts", get_timestamp(df.ts))
    # remove rows with empty ts value
    df_formated = df_log_copy.dropna(subset='ts')
    return df_formated

In [6]:
def write_parquet_song(table, parquet_path):
    table.write.partitionBy("year", "artist_id").parquet(parquet_path, mode = 'overwrite')

In [7]:
def write_parquet(table, parquet_path):
    table.write.parquet(parquet_path, mode = 'overwrite')

In [8]:
def write_parquet_time(table, parquet_path):
    table.write.partitionBy(['year', 'month']).parquet(parquet_path, mode = 'overwrite')

# GET AWS KEYS

In [9]:
config = configparser.ConfigParser()
config.read('dl.cfg')
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['KEY']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['SECRET']

# INITIATE SPARK SESSION

In [10]:
def create_spark_session():
    """
        Create or load a Spark session
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

spark = create_spark_session()
spark

# PROCESS SONG DATA

In [11]:
input_song = "s3a://udacity-dend/song_data/A/A/A/*.json"

In [12]:
def process_song_data(spark, input_data):
    '''
    process song data
    
    return df_song
    '''
    # read  data file
    song_schema = StructType([
        StructField("num_songs", IntegerType()),
        StructField("artist_id", StringType()),
        StructField("artist_latitude", FloatType()),
        StructField("artist_longitude", FloatType()),
        StructField("artist_location", StringType()),
        StructField("artist_name", StringType()),
        StructField("song_id", StringType()),
        StructField("title", StringType()),
        StructField("duration", FloatType()),
        StructField("year", IntegerType())
    ])    
    df_song = spark.read.json(input_data, schema = song_schema)
    # print('DataFrame rows: %d' % df_song.count())
    df_song.printSchema()
    print('DataFrame schema: %s' % df_song)
    return df_song

In [13]:
df_song = process_song_data(spark, input_song)

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)

DataFrame schema: DataFrame[num_songs: int, artist_id: string, artist_latitude: float, artist_longitude: float, artist_location: string, artist_name: string, song_id: string, title: string, duration: float, year: int]


### Create songs Table

* varchar : song_id, title, artist_id
* float : user_id
* int: year
* NOT NULL : song_id, title, artist_id

In [14]:
def create_songs_table(df):
    table = df_song \
        .drop_duplicates(['song_id']) \
        .select("song_id", "title", "artist_id", "year", "duration") \
        .filter('song_id != "" and title != "" and artist_id != ""') \
        .sort("song_id") 
    return(table)

In [15]:
# process and check
songs_table = create_songs_table(df_song)
songs_table.show(2)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOABWAP12A8C13F82A|           Take Time|AR5LMPY1187FB573FE|1978|258.89914|
|SOAFBCP12A8C13CC7D|King Of Scurf (20...|ARTC1LV1187B9A4858|1972|301.40036|
+------------------+--------------------+------------------+----+---------+
only showing top 2 rows



In [16]:
songs_table.collect()
parquet_path = 'output/songs_table'
write_parquet_song(songs_table, parquet_path)
# check_parquet(parquet_path)

### Create Artists Table

* varchar : artist_id, name, location
* float : latitude, longitude
* NOT NULL : artist_id, name

In [17]:
def create_artists_table(df):
    table = df \
        .drop_duplicates(['artist_id']) \
        .selectExpr("artist_id", "artist_name as name", "artist_location as location", "artist_latitude as latitude", "artist_longitude as longitude") \
        .filter('artist_id != "" and name != ""') \
        .sort("artist_id")
    return table

In [18]:
# process and check
artists_table = create_artists_table(df_song)
artists_table.show(2)

+------------------+--------------------+--------------------+--------+---------+
|         artist_id|                name|            location|latitude|longitude|
+------------------+--------------------+--------------------+--------+---------+
|AR0MWD61187B9B2B12|International Noi...|                    |    null|     null|
|AR10USD1187B99F3F1|Tweeterfriendly M...|Burlington, Ontar...|    null|     null|
+------------------+--------------------+--------------------+--------+---------+
only showing top 2 rows



In [19]:
artists_table.collect()
parquet_path = 'output/artists_table'
write_parquet(artists_table, parquet_path)
# check_parquet(parquet_path)

# LOG_DATA

In [20]:
input_log = "s3a://udacity-dend/log_data/*/*/*.json"

In [21]:
def process_log_data(spark, input_log):
    '''
    process log data
    
    return df_log
    '''
    # read log data file
    log_schema = StructType([
        StructField("artist", StringType()),
        StructField("auth", StringType()),
        StructField("firstName", StringType()),
        StructField("gender", StringType()),
        StructField("itemInSession", IntegerType()),
        StructField("lastName", StringType()),
        StructField("length", FloatType()),    
        StructField("level", StringType()),
        StructField("location", StringType()),
        StructField("method", StringType()),
        StructField("page", StringType()),
        StructField("registration", FloatType()),
        StructField("sessionId", StringType()),
        StructField("song", StringType()),
        StructField("status", IntegerType()),
        StructField("ts", LongType()),
        StructField("userAgent", StringType()),
        StructField("userId", StringType())
    ])
    
    df_log_raw = spark.read.json(input_log, schema = log_schema)
    # print('DataFrame raw: %d' % df_log_raw.count())
    df_log_next = df_log_raw.filter("page='NextSong'")
    # print('DataFrame next: %d' % df_log_next.count())
    df_log=clean_timestamp(df_log_next)    
    # print('DataFrame rows: %d' % df_log.count())
    df_log.printSchema()
    print('DataFrame schema: %s' % df_log)
    return df_log

In [22]:
df_log= process_log_data(spark, input_log)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: float (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: float (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- formated_ts: timestamp (nullable = true)

DataFrame schema: DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: int, lastName: string, length: float, level: string, location: string, method: string, page: string, registration: float, sessionId: string, so

### Create users table

* bigint: user_Id
* varchar first_name, last_name, gender
* NOT NULL : user_id, level

In [23]:
def create_users_table(df):
    users_table = df \
        .drop_duplicates(subset = ['userId']) \
        .filter('level != ""' and 'userId != ""') \
        .orderBy("ts", ascending = False) \
        .coalesce(1)\
        .selectExpr("cast(userId as Long) user_id", "firstName as first_name", "lastName as last_name", "gender", "level")   \
        .sort('user_id')
    return users_table

In [24]:
# process and check
users_table = create_users_table(df_log)
users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|      2|   Jizelle| Benjamin|     F| free|
|      3|     Isaac|   Valdez|     M| free|
|      4|    Alivia|  Terrell|     F| free|
|      5|    Elijah|    Davis|     M| free|
|      6|   Cecilia|    Owens|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [25]:
users_table.collect()
parquet_path = 'output/users_table'
write_parquet(users_table, parquet_path)
# check_parquet(parquet_path)

### Create Time table

* timestamp: start_time
* int: hour, day, week, month, year
* varchar:  weekday
* NOT NULL : start_time

In [26]:
def create_time_table(df):
    time_df = df \
        .drop_duplicates(['formated_ts']) \
        .select( \
            col('formated_ts').alias("start_time"),
            hour(col('formated_ts')).alias('hour'),
            dayofmonth(col('formated_ts')).alias('day'),
            weekofyear(col('formated_ts')).alias('week'),
            month(col('formated_ts')).alias('month'),
            year(col('formated_ts')).alias('year')) \
        .sort('start_time')

    time_table = time_df.withColumn('hour', F.hour('start_time')) \
                    .withColumn('day', F.dayofmonth('start_time')) \
                    .withColumn('year', F.year('start_time')) \
                    .withColumn('week', F.weekofyear('start_time')) \
                    .withColumn('month', F.month('start_time')) \
                    .withColumn('weekday', F.dayofweek('start_time').cast("string"))
    return time_table

In [42]:
# process and check
time_table = create_time_table(df_log)
time_table.show(20)

+--------------------+----+---+----+-----+----+-------+
|          start_time|hour|day|week|month|year|weekday|
+--------------------+----+---+----+-----+----+-------+
|2018-11-01 21:01:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:05:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:08:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:11:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:17:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:24:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:28:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:42:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:52:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 21:55:...|  21|  1|  44|   11|2018|      5|
|2018-11-01 22:23:...|  22|  1|  44|   11|2018|      5|
|2018-11-02 01:25:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:30:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 01:34:...|   1|  2|  44|   11|2018|      6|
|2018-11-02 02:42:...|   2|  2|  44|   11|2018| 

In [28]:
time_table.collect()
parquet_path = 'output/time_table'
write_parquet_time(time_table, parquet_path)
# check_parquet(parquet_path)

### Create the songplays fact table

* bigint: songplay_id, user_id
* timestamp : start_time
* varchar : level, song_id, artist_id, session_id, location, user_agent
* int: month, year
* NOT NULL : start_time, user_id, level, session_id



In [29]:
def create_songplays_table(tl, ts):
    tl = df_log.alias('tl')
    ts = df_song.alias('ts')
    
    inner_join = tl.join(ts, ((tl.artist == ts.artist_name) & (tl.artist == ts.artist_name)), how='inner')
    
    songplays = inner_join \
            .withColumn("songplay_id", monotonically_increasing_id()) \
            .filter('formated_ts != ""' and 'userId != ""' and 'level != ""' and 'sessionId != ""')
    
    songplays_table = songplays \
                    .selectExpr("songplay_id",
                                    "formated_ts as start_time",
                                    "cast(userId as Long) user_id",
                                    "level",
                                    "song_id",
                                    "artist_id",
                                    "sessionId as session_id",
                                    "location",
                                    "userAgent as user_agent") \
                    .sort('songplay_id') \
                    .withColumn('year', F.year('start_time')) \
                    .withColumn('month', F.month('start_time'))

    return songplays_table

In [30]:
# process and check
songplays_table = create_songplays_table(df_log, df_song)
songplays_table.show(5)

+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|songplay_id|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+-----------+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|          0|2018-11-15 16:55:...|     42| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       404|New York-Newark-J...|"Mozilla/5.0 (Win...|2018|   11|
|          1|2018-11-21 05:30:...|     97| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       797|Lansing-East Lans...|"Mozilla/5.0 (X11...|2018|   11|
|          2|2018-11-28 16:54:...|     14| free|SOIGHOD12A8C13B5A1|ARY589G1187B9A9F4E|       929|       Red Bluff, CA|Mozilla/5.0 (Wind...|2018|   11|
|          3|2018-11-05 02:21:...|     44| paid|SONRWUU12AF72A4283|ARGE7G11187FB37E05|       2

In [31]:
songplays_table.collect()
parquet_path = 'output/songplays_table'
write_parquet_time(songplays_table, parquet_path)
# check_parquet(parquet_path)

In [32]:
# check data in each table and the parquets files
%run -i '2_checkData.py'

+------------------+-----+
|song_id           |count|
+------------------+-----+
|SOABWAP12A8C13F82A|1    |
|SOAFBCP12A8C13CC7D|1    |
+------------------+-----+
only showing top 2 rows

Not Null with filter: 24 
Null with filter: 0 
+--------------------------------+-----+
|title                           |count|
+--------------------------------+-----+
|A Poor Recipe For Civic Cohesion|1    |
|Burn My Body (Album Version)    |1    |
+--------------------------------+-----+
only showing top 2 rows

Not Null with filter: 24 
Null with filter: 0 
+------------------+-----+
|artist_id         |count|
+------------------+-----+
|AR0MWD61187B9B2B12|1    |
|AR10USD1187B99F3F1|1    |
+------------------+-----+
only showing top 2 rows

Not Null with filter: 24 
Null with filter: 0 
Numbers of rows in songs_table : 24
Not Null song_id with filter: 24 
Null with song_id filter: 0 
Not Null title with filter: 24 
Null with title filter: 0 
Not Null artist_id with filter: 24 
Null with artist_id 

+------+-----+
|userId|count|
+------+-----+
|49    |689  |
|80    |665  |
|97    |557  |
|15    |463  |
|44    |397  |
+------+-----+
only showing top 5 rows

Not Null userId with filter: 6820 
Null userId with filter: 0 
+-----+-----+
|level|count|
+-----+-----+
|paid |5591 |
|free |1229 |
+-----+-----+

Not Null level with filter: 6820 
Null level with filter: 0 
Numbers of rows in users_table : 96
Not Null user_idwith filter: 96 
Null with user_id filter: 0 
total 4.0K
-rw-r--r-- 1 anthelix users    0 Mar 23 10:30 _SUCCESS
drwxr-xr-x 3 anthelix users 4.0K Mar 23 10:30 year=2018
Parquet Files: 18
DataFrame rows: 10
DataFrame schema: DataFrame[songplay_id: bigint, start_time: timestamp, user_id: bigint, level: string, song_id: string, artist_id: string, session_id: string, location: string, user_agent: string, year: int, month: int]
+-----------+-----------------------+-------+-----+------------------+------------------+----------+-------------------------------------+---------------

## Business Questions

* What are the busiest days of the week?
* What are the busiest times of the day?
* What are the top selling bakery items?

In [33]:
from math import pi
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource
from bokeh.transform import factor_cmap, cumsum
from bokeh.palettes import Paired12

output_notebook()

### What are the busiest days of the week?

In [34]:
songplays_table.createOrReplaceTempView('tmp_songplays')
songs_table.createOrReplaceTempView('tmp_song')

In [35]:
sql_query = "SELECT date_format(start_time, 'EEEE') as day, count(*) as count " \
            "FROM tmp_songplays " \
            "WHERE session_id NOT LIKE 'NONE' AND session_id NOT LIKE 'Adjustment' " \
            "GROUP BY day " \
            "ORDER BY count DESC " \
            "LIMIT 10"
df4 = spark.sql(sql_query)
df4.show(10, False)

+---------+-----+
|day      |count|
+---------+-----+
|Tuesday  |3    |
|Wednesday|2    |
|Friday   |2    |
|Monday   |2    |
|Thursday |1    |
+---------+-----+



In [36]:
data = df4.toPandas()
tooltips = [('day', '@day'), ('count', '@{count}{,}')]
days = data['day'].tolist()
color_map = factor_cmap(field_name='day', palette=Paired12, factors=days)

data['angle'] = data['count'] / data['count'].sum() * 2 * pi
plot = figure(plot_height=450,
              plot_width=700,
              title='Items Sold/Day',
              tooltips=tooltips,
              x_range=(-0.5, 1.0))
plot.wedge(x=0,
           y=1,
           radius=0.4,
           start_angle=cumsum('angle', include_zero=True),
           end_angle=cumsum('angle'),
           line_color='white',
           fill_color=color_map,
           legend_field='day',
           source=data)
plot.axis.axis_label = None
plot.axis.visible = False
plot.grid.grid_line_color = None

show(plot)

### What are the busiest times of the day?

In [37]:
def time_increment(h, m):
    """Calculates a 30-minute time increment

    Parameters:
    h (str): hours, '0' or '00' to '23'
    m (str): minutes, '0' or '00' to '59'

    Returns:
    str: 30-minute time increment, i.e. '07:30', '23:00', or '12:00'

    """

    increment = (int(m) * (100 / 60)) / 100  # 0.0000 - 0.9833
    increment = round(increment, 0)  # 0.0 or 1.0
    increment = int(increment) * 30  # 0 or 30
    increment = str(h).rjust(2, '0') + ':' + str(increment).rjust(2, '0')

    return increment  # i.e. '07:30' or '23:00'


spark.udf.register("udfTimeIncrement", time_increment, StringType())


sql_query = "WITH tmp_table AS (" \
            "  SELECT udfTimeIncrement(date_format(start_time, 'HH'), date_format(start_time, 'mm')) as period, count(*) as count " \
            "  FROM tmp_songplays " \
            "  WHERE user_id NOT LIKE 'NONE' AND user_id NOT LIKE 'Adjustment' " \
            "  GROUP BY period " \
            "  ORDER BY period ASC" \
            ") " \
            "SELECT period, count " \
            "FROM tmp_table " \
            "WHERE period BETWEEN '01:00' AND '23:59'"

df5 = spark.sql(sql_query)
df5.show(10, False)

+------+-----+
|period|count|
+------+-----+
|02:00 |1    |
|05:00 |1    |
|13:00 |1    |
|15:30 |1    |
|16:30 |3    |
|17:00 |1    |
|20:00 |1    |
+------+-----+



In [45]:
source = ColumnDataSource(data=df5.toPandas())
tooltips = [('period', '@period'), ('count', '@{count}{,}')]
periods = source.data['period'].tolist()
plot = figure(x_range=periods,
              plot_width=900,
              plot_height=450,
              min_border=0,
              tooltips=tooltips)
plot.vbar(x='period', bottom=0, top='count', source=source, width=0.9)
plot.title.text = 'Users Connection/Hour'
plot.xaxis.axis_label = 'Hour of the Day'
plot.yaxis.axis_label = 'Total Users Connection'

show(plot)

### What are the top song_id?


In [39]:
sql_query = "SELECT tsg.title, count(*) as count " \
            "FROM tmp_songplays AS tss " \
            "JOIN tmp_song AS tsg "  \
            "ON tsg.song_id=tss.song_id " \
            "WHERE tss.song_id NOT LIKE 'NONE' AND tss.song_id NOT LIKE 'Adjustment' " \
            "GROUP BY tsg.title " \
            "ORDER BY count DESC " \
            "LIMIT 10"

df6 = spark.sql(sql_query)
df6.show(10, False)

+----------------------------------+-----+
|title                             |count|
+----------------------------------+-----+
|Into The Nightlife                |4    |
|It's About Time                   |2    |
|Indian Angel                      |1    |
|The Last Beat Of My Heart (b-side)|1    |
|Face the Ashes                    |1    |
|Scream                            |1    |
+----------------------------------+-----+



In [40]:
source = ColumnDataSource(data=df6.toPandas())
tooltips = [('title', '@title'), ('count', '@{count}{,}')]
items = source.data['title'].tolist()
items.reverse()
plot = figure(y_range=items,
              plot_width=750,
              plot_height=375,
              min_border=0,
              tooltips=tooltips)
plot.hbar(y='title', right='count', height=.9, source=source)
plot.title.text = 'Top title'
plot.yaxis.axis_label = 'title'
plot.xaxis.axis_label = 'Total title listen'

show(plot)

In [41]:
songplays_table.createOrReplaceTempView('tmp_songplays')

df2 = spark.sql("SELECT DATE(start_time) AS date, count(*) as count " + "FROM tmp_songplays " +
                "GROUP BY date " + "ORDER BY count")

print('DataFrame rows: %d' % df2.count())

df3 = df2.withColumn("hourly_period", df2['date'].substr(1, 2))

print(df3.show(10))

DataFrame rows: 10
+----------+-----+-------------+
|      date|count|hourly_period|
+----------+-----+-------------+
|2018-11-16|    1|           20|
|2018-11-05|    1|           20|
|2018-11-15|    1|           20|
|2018-11-21|    1|           20|
|2018-11-28|    1|           20|
|2018-11-30|    1|           20|
|2018-11-06|    1|           20|
|2018-11-27|    1|           20|
|2018-11-19|    1|           20|
|2018-11-20|    1|           20|
+----------+-----+-------------+

None
