### Data lake tests with .zip file given to Udacity

In [1]:
from zipfile import ZipFile
import numpy as np
import pandas as pd

In [2]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, FloatType, TimestampType, LongType

import datetime #Required for ts conversion
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second

In [3]:
pd.set_option('max_colwidth', 200)

In [4]:
spark = SparkSession \
    .builder \
    .appName("DataLake - Local") \
    .getOrCreate()

In [5]:
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '45883'),
 ('spark.app.id', 'local-1572274915393'),
 ('spark.driver.host', '5b13c09a6e08'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'DataLake - Local'),
 ('spark.ui.showConsoleProgress', 'true')]

In [6]:
spark

## Gather

In [7]:
# Create a ZipFile Object and load sample.zip in it
with ZipFile('data/log-data.zip', 'r') as zipObj:
    # Extract all the contents of zip file in current directory
    zipObj.extractall('data/log-data/')
    
with ZipFile('data/song-data.zip', 'r') as zipObj:
    # Extract all the contents of zip file in current directory
    zipObj.extractall('data/song-data/')

In [8]:
path = "data/log-data"
log_df = spark.read.json(path)

In [9]:
path = "data/song-data/song_data/*/*/*/*.json"
#path = "data/song-data/song_data/A/A/A/TRAAAAW128F429D538.json"
song_schema = StructType([
	StructField("num_songs", IntegerType()),
    StructField("artist_id", StringType()),
    StructField("artist_latitude", FloatType()),
    StructField("artist_longitude", FloatType()),
    StructField("artist_location", StringType()),
    StructField("artist_name", StringType()),
    StructField("song_id", StringType()),
    StructField("title", StringType()),
    StructField("duration", FloatType()),
    StructField("year", IntegerType())
])

song_df = spark.read.json(path, 
          schema=song_schema)
#song_df = spark.read.schema(song_schema).json(path)

## Assess

In [10]:
log_df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [11]:
log_df.count()

8056

In [12]:
log_df.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [13]:
# Checking auth column (Logged Out, Logged In)
log_df.select("auth").dropDuplicates().show()

+----------+
|      auth|
+----------+
|Logged Out|
| Logged In|
+----------+



In [14]:
# Checking gender column ( replace null with blank)
log_df.groupby(log_df.gender).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F| 5482|
|  null|  286|
|     M| 2288|
+------+-----+



In [15]:
# iteminsession
log_df.select("iteminsession").dropDuplicates().show()

+-------------+
|iteminsession|
+-------------+
|           26|
|           29|
|           65|
|           19|
|           54|
|            0|
|          112|
|          113|
|           22|
|            7|
|           77|
|           34|
|          126|
|           50|
|           94|
|          110|
|           57|
|           32|
|           43|
|           84|
+-------------+
only showing top 20 rows



In [16]:
log_df.select("length").dropDuplicates().show()

+---------+
|   length|
+---------+
|375.03955|
|169.35138|
| 325.0673|
|231.94077|
|347.81995|
| 64.67873|
|241.94567|
|282.04363|
|438.67383|
|137.84771|
|227.83955|
|213.75955|
|262.79138|
|317.30893|
|216.47628|
| 229.0673|
|280.34567|
|334.75873|
|288.57424|
|377.36444|
+---------+
only showing top 20 rows



In [17]:
log_df.select("level").dropDuplicates().show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [18]:
log_df.select("page").dropDuplicates().show()

+----------------+
|            page|
+----------------+
|Submit Downgrade|
|            Home|
|       Downgrade|
|          Logout|
|   Save Settings|
|           About|
|        Settings|
|           Login|
|        NextSong|
|            Help|
|         Upgrade|
|           Error|
|  Submit Upgrade|
+----------------+



In [19]:
log_df.select("registration").dropDuplicates().show()

+-----------------+
|     registration|
+-----------------+
|1.540472624796E12|
|1.540810448796E12|
|1.541016707796E12|
|1.540823606796E12|
|1.541064343796E12|
|1.540130971796E12|
|1.540676534796E12|
|1.540992766796E12|
|1.540644861796E12|
|1.540907087796E12|
|1.541020249796E12|
|1.540991795796E12|
|1.541003367796E12|
|1.541033612796E12|
|1.541091973796E12|
|1.540306145796E12|
|1.540492941796E12|
|             null|
|1.540856629796E12|
|1.540970748796E12|
+-----------------+
only showing top 20 rows



In [20]:
log_df.select("sessionId").dropDuplicates().show()

+---------+
|sessionId|
+---------+
|      964|
|      474|
|      558|
|      541|
|     1010|
|      418|
|      191|
|      222|
|      730|
|      270|
|      938|
|      293|
|      442|
|      720|
|      705|
|      243|
|      278|
|      296|
|      926|
|      965|
+---------+
only showing top 20 rows



In [21]:
log_df.select("userid").dropDuplicates().show()

+------+
|userid|
+------+
|    51|
|     7|
|    15|
|    54|
|   101|
|    11|
|    29|
|    69|
|    42|
|    73|
|    87|
|    64|
|     3|
|    30|
|    34|
|    59|
|     8|
|    28|
|    22|
|    85|
+------+
only showing top 20 rows



In [22]:
#gender_count = log_df.select(["gender"])
#gender_count.groupby(gender_count.gender).count().show()
#gc2 = gender_count.replace('None', None).na.fill('')
#gc2.groupby(gc2.gender).count().show()
#gc2.filter(gc2.gender == '').show(5)

log_df_copy = log_df.replace('None', None).na.fill('')
log_df_copy.filter(log_df_copy.gender == '').select("auth").dropDuplicates().show()

+----------+
|      auth|
+----------+
|Logged Out|
+----------+



In [23]:
log_df.groupby(log_df.gender).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F| 5482|
|  null|  286|
|     M| 2288|
+------+-----+



In [24]:
song_df.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)



In [25]:
song_df.count()

71

In [26]:
song_df.show(5)

+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|  artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+-----------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|           null|            null|                 |Montserrat Caball...|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16364|   0|
|        1|AREBBGV1187FB523D2|           null|            null|      Houston, TX|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|
|        1|ARMAC4T1187FB3FA4C|       40.82624|       -74.47995|Morris Plains, NJ|The Dillinger Esc...|SOBBUGU12A8C13E95D|Setting Fire to S...|207.77751|2004|
|        1|ARPBNLO1187FB3D52F|       40.71455|      

In [27]:
song_df.select("artist_latitude").dropDuplicates().show()

+---------------+
|artist_latitude|
+---------------+
|       56.27609|
|       35.14968|
|       36.16778|
|       49.80388|
|        -13.442|
|       34.31109|
|       27.94017|
|           null|
|       40.71455|
|        40.7038|
|       51.50632|
|       41.88415|
|       30.08615|
|       39.49974|
|       35.21962|
|       40.99471|
|       37.83721|
|        38.8991|
|       37.16793|
|       32.67828|
+---------------+
only showing top 20 rows



In [28]:
song_df.select("artist_longitude").dropDuplicates().show()

+----------------+
|artist_longitude|
+----------------+
|       -94.10158|
|       -83.22295|
|       -87.63241|
|      -122.42005|
|       -77.60454|
|       -86.77836|
|       -73.96644|
|            null|
|       -73.94512|
|        -41.9952|
|         9.51695|
|       -81.37739|
|       -80.11278|
|       -82.32547|
|       -74.47995|
|       -80.01955|
|       -90.04892|
|        -0.12714|
|       -94.02978|
|       -79.38533|
+----------------+
only showing top 20 rows



In [29]:
song_df.select("artist_location").dropDuplicates().show()

+---------------+
|artist_location|
+---------------+
|Gainesville, FL|
|           Utah|
|     Wisner, LA|
|     Queens, NY|
|           Ohio|
|      Dubai UAE|
| Nashville, TN.|
| Zagreb Croatia|
|   Pennsylvania|
| Hamilton, Ohio|
|    Houston, TX|
|  United States|
|      Noci (BA)|
|     Washington|
|        Chicago|
|California - LA|
|        Denmark|
|  Hamtramck, MI|
|    Chicago, IL|
|         Panama|
+---------------+
only showing top 20 rows



In [30]:
song_df.select("artist_name").dropDuplicates().show()

+--------------------+
|         artist_name|
+--------------------+
|        David Martin|
|                 Clp|
|          Bitter End|
|         King Curtis|
|        The Box Tops|
|          John Davis|
|        Gwen Stefani|
|          Faye Adams|
|     Richard Souther|
|                Jinx|
|  Luna Orbit Project|
|      Five Bolt Main|
|Mike Jones (Featu...|
|       Terry Callier|
|    Billie Jo Spears|
|     Christos Dantis|
|Kenny G featuring...|
|         Mitch Ryder|
|         Lupe Fiasco|
|       Faiz Ali Faiz|
+--------------------+
only showing top 20 rows



### Quality
1. song_df : Replace artist_latitude & artist_longitude null with 0

### Tidiness
1. log_df : Filter only column page with value "NextSong"
2. log_df : Convert ts from long to datetime
3. log_df : Convert registration from double to long
4. Create below Fact & Dimension tables 
  * create songplays table with columns songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
  * create users table with columns user_id, first_name, last_name, gender, level
  * create songs table with columns song_id, title, artist_id, year, duration
  * create artists table with columns artist_id, name, location, lattitude, longitude
  * create time table with columns start_time, hour, day, week, month, year, weekday

## Clean

In [31]:
# Make copies of files
log_clean = log_df
song_clean = song_df

print('log_clean = ',log_clean.count())
print('song_clean = ',song_clean.count())

log_clean =  8056
song_clean =  71


### Issue 1 : In log_df Keep column page with value "NextSong" only
### Define
1. log_df : Filter only column page with value "NextSong"
    
### Code

In [32]:
log_clean.filter(log_clean.page == 'NextSong').count()

6820

In [33]:
log_clean = log_clean.filter(log_clean.page == 'NextSong')

### Test

In [34]:
print('log_clean = ',log_clean.count())
log_clean.select("page").dropDuplicates().show()

log_clean =  6820
+--------+
|    page|
+--------+
|NextSong|
+--------+



### Issue 2 : Convert log_df ts column from long to timestamp
### Define
2. log_df : Convert ts from long to datetime

### Code

In [35]:
log_clean.select('ts').show(5)

+-------------+
|           ts|
+-------------+
|1542241826796|
|1542242481796|
|1542242741796|
|1542253449796|
|1542260935796|
+-------------+
only showing top 5 rows



In [36]:
# Method 1
convert_ts = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0), TimestampType())
log_clean_copy = log_clean.withColumn("ts_converted", convert_ts(log_clean.ts))

# Method 2
#from pyspark.sql import functions as F
#log_clean.select(['ts', F.from_unixtime(F.col('ts')/1000)] ).show()

In [37]:
# Method 3
'''
from datetime import datetime

log_clean_copy = log_clean

@udf(t.StringType())
def get_datetime(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

log_clean_copy = log_clean_copy.withColumn("current_ts", get_datetime("ts"))
'''

'\nfrom datetime import datetime\n\nlog_clean_copy = log_clean\n\n@udf(t.StringType())\ndef get_datetime(ts):\n    return datetime.fromtimestamp(ts / 1000.0).strftime(\'%Y-%m-%d %H:%M:%S\')\n\nlog_clean_copy = log_clean_copy.withColumn("current_ts", get_datetime("ts"))\n'

In [38]:
log_clean = log_clean_copy

### Test

In [39]:
log_clean.select(['ts', 'ts_converted']).show(5)

+-------------+--------------------+
|           ts|        ts_converted|
+-------------+--------------------+
|1542241826796|2018-11-15 00:30:...|
|1542242481796|2018-11-15 00:41:...|
|1542242741796|2018-11-15 00:45:...|
|1542253449796|2018-11-15 03:44:...|
|1542260935796|2018-11-15 05:48:...|
+-------------+--------------------+
only showing top 5 rows



### Issue 3 : In log_df, convert registration from double to long
### Define
log_df : Convert registration from double to long

### Code

In [40]:
log_clean.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_converted: timestamp (nullable = true)



In [41]:
log_clean.select('registration').show(2, truncate=False)

+-----------------+
|registration     |
+-----------------+
|1.541016707796E12|
|1.541016707796E12|
+-----------------+
only showing top 2 rows



In [42]:
log_clean_copy = log_clean.withColumn("registration_converted", log_clean.registration.cast(LongType()) )

In [43]:
log_clean = log_clean_copy

### Test

In [44]:
log_clean.select(['registration', 'registration_converted']).show(5)

+-----------------+----------------------+
|     registration|registration_converted|
+-----------------+----------------------+
|1.541016707796E12|         1541016707796|
|1.541016707796E12|         1541016707796|
|1.541016707796E12|         1541016707796|
|1.540492941796E12|         1540492941796|
|1.540794356796E12|         1540794356796|
+-----------------+----------------------+
only showing top 5 rows



In [45]:
log_clean.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_converted: timestamp (nullable = true)
 |-- registration_converted: long (nullable = true)



### Issue 4 : Replace artist_latitude & artist_longitude null with 0
### Define
song_df : Replace artist_latitude & artist_longitude null with 0
    
### Code

In [46]:
song_clean_copy = song_clean

In [47]:
song_clean_copy = song_clean_copy.fillna({'artist_latitude':0})
song_clean_copy = song_clean_copy.fillna({'artist_longitude':0})

### Test

In [48]:
#song_clean_copy.select("artist_latitude").dropDuplicates().show()
song_clean_copy.filter(song_clean_copy.artist_latitude == 0).show(2)
song_clean_copy.filter(song_clean_copy.artist_longitude == 0).show(2)

+---------+------------------+---------------+----------------+---------------+--------------------+------------------+--------------------+---------+----+
|num_songs|         artist_id|artist_latitude|artist_longitude|artist_location|         artist_name|           song_id|               title| duration|year|
+---------+------------------+---------------+----------------+---------------+--------------------+------------------+--------------------+---------+----+
|        1|ARDR4AC1187FB371A1|            0.0|             0.0|               |Montserrat Caball...|SOBAYLL12A8C138AF9|Sono andati? Fing...|511.16364|   0|
|        1|AREBBGV1187FB523D2|            0.0|             0.0|    Houston, TX|Mike Jones (Featu...|SOOLYAZ12A6701F4A6|Laws Patrolling (...|173.66159|   0|
+---------+------------------+---------------+----------------+---------------+--------------------+------------------+--------------------+---------+----+
only showing top 2 rows

+---------+------------------+---------

### Issue 5 : Create Fact & Dimension tables
### Define
* create songplays table with columns songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
* create users table with columns user_id, first_name, last_name, gender, level
* create songs table with columns song_id, title, artist_id, year, duration
* create artists table with columns artist_id, name, location, lattitude, longitude
* create time table with columns start_time, hour, day, week, month, year, weekday

### Code

In [49]:
# Creating time table with columns start_time, hour, day, week, month, year, weekday
log_clean.select(['ts_converted'
                  , dayofmonth("ts_converted").alias('day')
                  , month("ts_converted").alias('month')
                  , year("ts_converted").alias('year')
                  , hour("ts_converted").alias('hour')
                  , minute("ts_converted").alias('minute')
                  , second("ts_converted").alias('second')
                 ]).show(5)

+--------------------+---+-----+----+----+------+------+
|        ts_converted|day|month|year|hour|minute|second|
+--------------------+---+-----+----+----+------+------+
|2018-11-15 00:30:...| 15|   11|2018|   0|    30|    26|
|2018-11-15 00:41:...| 15|   11|2018|   0|    41|    21|
|2018-11-15 00:45:...| 15|   11|2018|   0|    45|    41|
|2018-11-15 03:44:...| 15|   11|2018|   3|    44|     9|
|2018-11-15 05:48:...| 15|   11|2018|   5|    48|    55|
+--------------------+---+-----+----+----+------+------+
only showing top 5 rows



In [50]:
time_table = log_clean.select(['ts_converted'])\
                    .withColumnRenamed('ts_converted','start_time') 

time_table = time_table.withColumn('day', F.dayofmonth('start_time')) \
                      .withColumn('month', F.month('start_time')) \
                      .withColumn('year', F.year('start_time')) \
                      .withColumn('hour', F.hour('start_time')) \
                      .withColumn('minute', F.minute('start_time')) \
                      .withColumn('second', F.second('start_time')) \
                      .withColumn('week', F.weekofyear('start_time')) \
                      .withColumn('weekday', F.dayofweek('start_time')).dropDuplicates()

time_table.show(5)

+--------------------+---+-----+----+----+------+------+----+-------+
|          start_time|day|month|year|hour|minute|second|week|weekday|
+--------------------+---+-----+----+----+------+------+----+-------+
|2018-11-15 11:10:...| 15|   11|2018|  11|    10|     1|  46|      5|
|2018-11-15 16:49:...| 15|   11|2018|  16|    49|    50|  46|      5|
|2018-11-21 20:55:...| 21|   11|2018|  20|    55|     9|  47|      4|
|2018-11-14 11:45:...| 14|   11|2018|  11|    45|    58|  46|      4|
|2018-11-28 15:22:...| 28|   11|2018|  15|    22|    39|  48|      4|
+--------------------+---+-----+----+----+------+------+----+-------+
only showing top 5 rows



In [51]:
log_clean.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|        ts_converted|registration_converted|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+--------------------+----------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|       Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-1

In [52]:
# creating users table with columns user_id, first_name, last_name, gender, level
users_table = log_clean.select(['userId', 'firstName', 'lastName', 'gender', 'level'])\
        .withColumnRenamed('userId', 'user_id')\
        .withColumnRenamed('firstName', 'first_name')\
        .withColumnRenamed('lastName', 'last_name').dropDuplicates()

users_table.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     26|      Ryan|    Smith|     M| free|
|      7|    Adelyn|   Jordan|     F| free|
|     71|    Ayleen|     Wise|     F| free|
|     81|    Sienna|    Colon|     F| free|
|     87|    Dustin|      Lee|     M| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [53]:
log_clean.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- ts_converted: timestamp (nullable = true)
 |-- registration_converted: long (nullable = true)



In [54]:
log_clean.show(2)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----------------------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|        ts_converted|registration_converted|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----------------------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|2018-11-15 00:30:...|        

In [63]:
log_clean.select('artist').show(20)

+--------------------+
|              artist|
+--------------------+
|            Harmonia|
|         The Prodigy|
|               Train|
|         Sony Wonder|
|           Van Halen|
|           Magic Sam|
|Edward Sharpe & T...|
|Usher featuring w...|
|         Helen Reddy|
|        Taylor Swift|
|           Sean Paul|
|         Soundgarden|
|         The Killers|
|       Amy Winehouse|
|      Steve Anderson|
|          Rob Zombie|
|  Deadmau5 & Kaskade|
|        Shania Twain|
|      Los Campesinos|
|            Ill Nino|
+--------------------+
only showing top 20 rows



In [55]:
song_clean.printSchema()

root
 |-- num_songs: integer (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: float (nullable = true)
 |-- artist_longitude: float (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- duration: float (nullable = true)
 |-- year: integer (nullable = true)



In [56]:
# creating songs table with columns song_id, title, artist_id, year, duration
songs_table = song_clean.select(['song_id', 'title', 'artist_id', 'year', 'duration']).dropDuplicates()
songs_table.show(5)

+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOQVMXR12A81C21483|         Salt In NYC|ARKULSX1187FB45F84|   0|424.12363|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOMZWCG12A8C13C480|    I Didn't Mean To|ARD7TVE1187B99BFB1|   0| 218.9318|
|SOZHPGD12A8C1394FE|     Baby Come To Me|AR9AWNF1187B9AB0B4|   0|236.93016|
|SOFSOCN12A8C143F5D|      Face the Ashes|ARXR32B1187FB57099|2007|209.60608|
+------------------+--------------------+------------------+----+---------+
only showing top 5 rows



In [57]:
# creating artists table with columns artist_id, name, location, latitude, longitude
artists_table = song_clean.select(['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude'])\
                .withColumnRenamed('artist_name','name') \
                .withColumnRenamed('artist_location','location') \
                .withColumnRenamed('artist_latitude','latitude') \
                .withColumnRenamed('artist_longitude','longitude').dropDuplicates()

artists_table.show(5)

+------------------+---------------+--------------------+--------+---------+
|         artist_id|           name|            location|latitude|longitude|
+------------------+---------------+--------------------+--------+---------+
|ARXR32B1187FB57099|            Gob|                    |    null|     null|
|AROGWRA122988FEE45|Christos Dantis|                    |    null|     null|
|AREDL271187FB40F44|   Soul Mekanik|                    |    null|     null|
|AROUOZZ1187B9ABE51|    Willie Bobo|New York, NY [Spa...|40.79195|-73.94512|
|ARGSAFR1269FB35070|     Blingtones|                    |    null|     null|
+------------------+---------------+--------------------+--------+---------+
only showing top 5 rows



In [60]:
# Creating songplays table with columns songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songs_table.createOrReplaceTempView('songs')
users_table.createOrReplaceTempView('users')
artists_table.createOrReplaceTempView('artists')
time_table.createOrReplaceTempView('time')
log_clean.createOrReplaceTempView('log_clean')
song_clean.createOrReplaceTempView('song_clean')

In [89]:
songplays_table = spark.sql("""
select  t.start_time, l.userId as user_id, l.level, s.song_id, a.artist_id, l.sessionId as session_id, l.location, l.userAgent as user_agent
, t.year, t.month
from log_clean l join time t
on l.ts_converted = t.start_time
join artists a
on l.artist = a.name
join songs s
on l.song = s.title
""")

In [90]:
# A column that generates monotonically increasing 64-bit integers.
# The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.

# create a monotonically increasing id 
songplays_table = songplays_table.withColumn("idx", F.monotonically_increasing_id())

# then since the id is increasing but not consecutive, it means you can sort by it, so you can use the `row_number`
songplays_table.createOrReplaceTempView('songplays_table')
songplays_table = spark.sql("""
select row_number() over (order by "idx") as num, 
start_time, user_id, level, song_id, artist_id, session_id, location, user_agent, year, month
from songplays_table
""")

In [91]:
songplays_table.count()

1

In [92]:
songplays_table.show()

+---+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|num|          start_time|user_id|level|           song_id|         artist_id|session_id|            location|          user_agent|year|month|
+---+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+
|  1|2018-11-21 21:56:...|     15| paid|SOZCTXZ12AB0182364|AR5KOSW1187FB35FF4|       818|Chicago-Napervill...|"Mozilla/5.0 (X11...|2018|   11|
+---+--------------------+-------+-----+------------------+------------------+----------+--------------------+--------------------+----+-----+



### Saving table files

In [None]:
# Write DF to Spark parquet file (partitioned by year and artist_id)
songs_table.write.partitionBy("year", "artist_id").parquet(songs_table_path)

songplays_table.write.parquet(f'{output_data}/songplays_table',
                                  mode='overwrite',
                                  partitionBy=['year', 'month'])

# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(output_data+'songplays/'+'songplays.parquet', partitionBy=['year','month'])

In [95]:
# Requirement is to partition by year & artist, since songs_table contains artist_id, so instead of artist_name, artist_id is used.
songs_table.write.parquet('./output/songs.parquet', mode='overwrite', partitionBy=['year', 'artist_id'])

In [96]:
# Requirement is to partition by year & month
time_table.write.parquet('./output/time.parquet', mode='overwrite', partitionBy=['year', 'month'])

In [97]:
# Requirement is to partition by year & month
songplays_table.write.parquet('./output/songplays.parquet', mode='overwrite', partitionBy=['year', 'month'])

In [1]:
artists_table.write.parquet('./output/artists.parquet', mode='overwrite')
users_table.write.parquet('./output/users.parquet', mode='overwrite')

NameError: name 'artists_table' is not defined

In [10]:
print('hello')

hello
