In [156]:
import configparser
from datetime import datetime  
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
import pandas as pd

In [27]:
from pyspark.sql.types import DateType,TimestampType

In [2]:
import pyspark.sql.functions as F
F.monotonically_increasing_id()

In [3]:
F.monotonically_increasing_id()

AttributeError: 'NoneType' object has no attribute '_jvm'

In [7]:
!pwd

/home/workspace


In [12]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [13]:
spark = create_spark_session()
input_data = "https://udacity-dend.us-west-2.amazonaws.com/song_data"
output_data = "/home/workspace/"
# df = spark.createDataFrame(pd.read_csv("https://udacity-dend.s3.us-west-2.amazonaws.com/pagila/payment/payment.csv",delimiter=';'))

In [16]:
song_data = spark.read.option("header", "true").json("data/song_data/A/[A-B]/[A-C]/*.json")
song_data.show(5)

+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|         artist_id|artist_latitude|     artist_location|artist_longitude|         artist_name| duration|num_songs|           song_id|               title|year|
+------------------+---------------+--------------------+----------------+--------------------+---------+---------+------------------+--------------------+----+
|ARDR4AC1187FB371A1|           null|                    |            null|Montserrat Caball...|511.16363|        1|SOBAYLL12A8C138AF9|Sono andati? Fing...|   0|
|AREBBGV1187FB523D2|           null|         Houston, TX|            null|Mike Jones (Featu...|173.66159|        1|SOOLYAZ12A6701F4A6|Laws Patrolling (...|   0|
|ARMAC4T1187FB3FA4C|       40.82624|   Morris Plains, NJ|       -74.47995|The Dillinger Esc...|207.77751|        1|SOBBUGU12A8C13E95D|Setting Fire to S...|2004|
|ARPBNLO1187FB3D52F|       40.7145

In [17]:
# read song data file
song_data.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [27]:
song = song_data.select(['song_id','title','duration','year','artist_id']).dropDuplicates().collect()

In [None]:
# extract columns to create songs table
song_data.select(['song_id','title','duration','year','artist_id']).distinct()

In [None]:
# write songs table to parquet files partitioned by year and artist
songs_table.write.partitionBy("year","artist_id").parquet("songs_table.parquet")

In [36]:
# extract columns to create artists table
artists_table = song_data.select(['artist_id','artist_name','artist_location','artist_latitude', 'artist_longitude']).distinct()

In [None]:
# write artists table to parquet files
artists_table

In [98]:
# get filepath to song data file
#song_data = spark.read('/home/workspace/data/')
log_data = spark.read.option("header", "true").json("data/*.json")
log_data.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
|The Prodigy|Logged In|     Ryan|     M|            1|   Smith|260.07465| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|The Big Gundown|

In [99]:
log_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [100]:
df = log_data.filter(log_data.page=='NextSong')

In [101]:
df.select(['userId','firstName','lastName','gender','level']).distinct()

DataFrame[userId: string, firstName: string, lastName: string, gender: string, level: string]

In [102]:
song_df = spark.read.parquet('songs_table.parquet')

In [103]:
song_df = song_df.withColumnRenamed(existing='title',new='song')

In [97]:
log_data.join(song_df, on=['song'],how='left').select(['userId','level','song_id','artist_id','sessionId','location','userAgent'])

DataFrame[userId: string, level: string, song_id: string, artist_id: string, sessionId: bigint, location: string, userAgent: string]

In [218]:
get_timestamp = udf(lambda x:datetime.fromtimestamp(int(x)/1000), TimestampType())

In [219]:
get_datetime = udf(lambda x: datetime.timestamp(x), TimestampType())

In [220]:
df = log_data.withColumn("start_time", get_timestamp('ts'))
df.printSchema()
df.select(['start_time']).show(5)

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- start_time: timestamp (nullable = true)

+--------------------+
|          start_time|
+--------------------+
|2018-11-15 00:30:...|
|2018-11-15 00:41:...|
|2018-11-15 00:45:...|
|2018-11-15 01:57:...|
|2018-11-15 03:29:...|
+--------------------+
only showing top 5 rows



In [221]:
df = df.withColumn('hour', hour('start_time')) \
    .withColumn('day', dayofmonth('start_time')) \
    .withColumn('week', weekofyear('start_time')) \
    .withColumn('month', month('start_time')) \
    .withColumn('year', year('start_time')) \
    .withColumn('weekday', dayofweek('start_time')) \
    .select('ts','start_time','hour', 'day', 'week', 'month', 'year', 'weekday').drop_duplicates()

IndentationError: unexpected indent (<ipython-input-221-d5c47ca89f78>, line 2)

In [217]:
df.show(5)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|           song|status|           ts|           userAgent|userId|          start_time|hour|day|week|month|year|weekday|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+---------------+------+-------------+--------------------+------+--------------------+----+---+----+-----+----+-------+
|   Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|  Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26