In [69]:
import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format

In [2]:
config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [4]:
os.environ['AWS_ACCESS_KEY_ID']

'AKIA6RUVPDAPP53KNYI4'

In [5]:
spark = create_spark_session()

In [11]:
song_data = "s3a://udacity-dend/song_data/A/B/C/TRABCEI128F424C983.json"
log_data = "s3a://udacity-dend/log_data/2018/11/2018-11-01-events.json"

In [7]:
df = spark.read.json(log_data)

In [8]:
df

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [119]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



log data json auto schema looks fine to use without declaring them

In [10]:
df.select('artist', 'level').show(5)

+-------+-----+
| artist|level|
+-------+-----+
|   null| free|
|   null| free|
|Des'ree| free|
|   null| free|
|Mr Oizo| free|
+-------+-----+
only showing top 5 rows



In [16]:
df.filter(df.page.isin('NextSong')).show(5)

+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|           artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|                song|status|           ts|           userAgent|userId|
+-----------------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+--------------------+------+-------------+--------------------+------+
|          Des'ree|Logged In|   Kaylee|     F|            1| Summers|246.30812| free|Phoenix-Mesa-Scot...|   PUT|NextSong|1.540344794796E12|      139|        You Gotta Be|   200|1541106106796|"Mozilla/5.0 (Win...|     8|
|          Mr Oizo|Logged In|   Kaylee|     F|            3| Summers|144.03873| free|Phoenix-Mesa-Scot...|   PUT|Nex

In [12]:
sdf = spark.read.json(song_data)

In [13]:
sdf.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: string (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



Auto schema does seem fine, but if we are more interested in the latitude,longitude data might want to assign Double to it.

In [16]:
sdf.select('artist_id', 'title', 'song_id', 'artist_latitude', 'artist_longitude').show(5)

+------------------+-------------------+------------------+---------------+----------------+
|         artist_id|              title|           song_id|artist_latitude|artist_longitude|
+------------------+-------------------+------------------+---------------+----------------+
|ARJIE2Y1187B994AB7|Der Kleine Dompfaff|SOUPIRU12A6D4FA1E1|           null|            null|
+------------------+-------------------+------------------+---------------+----------------+



In [104]:
get_timestamp = udf(lambda x:datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))

In [105]:
time_table = df.withColumn('datetime', get_timestamp(df.ts))

In [106]:
time_table = time_table.select('datetime')

In [107]:
time_table.head(5)

[Row(datetime='2018-11-01 20:57:10'),
 Row(datetime='2018-11-01 21:01:46'),
 Row(datetime='2018-11-01 21:01:46'),
 Row(datetime='2018-11-01 21:02:12'),
 Row(datetime='2018-11-01 21:05:52')]

In [113]:
weekday = udf(lambda x:datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%a'))

In [116]:
time_table = time_table.select(col("datetime").alias("start_time"),
                               hour(col("datetime")).alias("hour"),
                               dayofmonth(col("datetime")).alias("day"), 
                               weekofyear(col("datetime")).alias("week"), 
                               month(col("datetime")).alias("month"),
                               year(col("datetime")).alias("year"),
                               weekday(col("datetime").alias("weekday")))

In [117]:
time_table.head(5)

[Row(start_time='2018-11-01 20:57:10', hour=20, day=1, week=44, month=11, year=2018, <lambda>(datetime AS `weekday`)='Thu'),
 Row(start_time='2018-11-01 21:01:46', hour=21, day=1, week=44, month=11, year=2018, <lambda>(datetime AS `weekday`)='Thu'),
 Row(start_time='2018-11-01 21:01:46', hour=21, day=1, week=44, month=11, year=2018, <lambda>(datetime AS `weekday`)='Thu'),
 Row(start_time='2018-11-01 21:02:12', hour=21, day=1, week=44, month=11, year=2018, <lambda>(datetime AS `weekday`)='Thu'),
 Row(start_time='2018-11-01 21:05:52', hour=21, day=1, week=44, month=11, year=2018, <lambda>(datetime AS `weekday`)='Thu')]

In [102]:
a = foo(1541105830796)

In [103]:
test = spark.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
test.select(hour('a').alias('hour')).collect()
[Row(hour=13)]

[Row(hour=13)]

In [47]:
test.select('a').show(5)

+-------------------+
|                  a|
+-------------------+
|2015-04-08 13:08:15|
+-------------------+



In [90]:
time_table.printSchema()

root
 |-- start_time: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)



In [91]:
time_table.head(5)

[Row(start_time='2018-11-01 20:57:10', hour=20, day=1, week=44, month=11, year=2018),
 Row(start_time='2018-11-01 21:01:46', hour=21, day=1, week=44, month=11, year=2018),
 Row(start_time='2018-11-01 21:01:46', hour=21, day=1, week=44, month=11, year=2018),
 Row(start_time='2018-11-01 21:02:12', hour=21, day=1, week=44, month=11, year=2018),
 Row(start_time='2018-11-01 21:05:52', hour=21, day=1, week=44, month=11, year=2018)]

In [95]:
from datetime import date
import calendar

In [111]:
weekday = lambda x:datetime.strptime(x, '%Y-%m-%d %H:%M:%S').strftime('%a')

In [112]:
weekday(a)

'Thu'