In [1]:
#import all the necessary spark libraries for the sparkify project
import configparser
#from datetime import datetime
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
# import libraries for timestamp function
from pyspark.sql.types import StringType
from pyspark.sql.functions import from_unixtime

In [2]:
# Point to the dl.cfg file to get the access and secret keys for reading and writing to S3 on AWS
config = configparser.ConfigParser()
config.read_file(open('dl.cfg'))

In [3]:
# Read the AWS access and secret keys
os.environ["AWS_ACCESS_KEY_ID"]=config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]=config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
# Create a spark session using AWS hadoop packages
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [None]:
#### WRITE ARTISTS AND SONGS TABLES TO PARQUEST FILES FROM SONG DATA FILE

In [5]:
## get filepath to song data file from s3 bucket
input_data = 's3a://udacity-dend/'
song_data = os.path.join( input_data, "song_data/A/A/A/*.json")  

In [6]:
## read the song data file in json format  
df_songsets = spark.read.json(song_data)

In [7]:
# show dataframe with columns
df_songsets

DataFrame[artist_id: string, artist_latitude: double, artist_location: string, artist_longitude: double, artist_name: string, duration: double, num_songs: bigint, song_id: string, title: string, year: bigint]

In [8]:
df_songsets.limit(1).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972


In [9]:
## extract columns to create songs table
songs_table = df_songsets['song_id', 'title', 'artist_id', 'year', 'duration']
songs_table

DataFrame[song_id: string, title: string, artist_id: string, year: bigint, duration: double]

In [None]:
## extract columns from dataframe to create artists table
#artists_table = df_songsets['artist_id', 'artist_name', 'artist_location', 'artist_latitude', 'artist_longitude']
#artists_table

In [10]:
artists_table = df_songsets.select(col("artist_id"),col("artist_name").alias("name"), col("artist_location").alias("location"),
                                  col("artist_latitude").alias("latitude"),col("artist_longitude").alias("longitude"))

In [11]:
# rename four columns from artists table
#artists_table2 = (artists_table.withColumnRenamed("artist_name", "name")
                               #.withColumnRenamed("artist_location", "location")
                               #.withColumnRenamed("artist_latitude", "latitude")
                               #.withColumnRenamed("artist_longitude", "longitude"))
#artists_table2.show(5)
artists_table.limit(2).toPandas()
#type(artists_table2)

Unnamed: 0,artist_id,name,location,latitude,longitude
0,ARTC1LV1187B9A4858,The Bonzo Dog Band,"Goldsmith's College, Lewisham, Lo",51.4536,-0.01802
1,ARA23XO1187B9AF18F,The Smithereens,"Carteret, New Jersey",40.57885,-74.21956


In [12]:
# write songs table to parquet files partitioned by year and artist to s3 bucket
# from AWS console in S3 create bucket name 'datalake-bucket2019'
output_bucket = 's3a://datalake-bucket2019/'
output_songs = os.path.join(output_bucket, 'songs')
#songs_table.write.partitionBy('year', 'artist_id').parquet(output_songs)

In [None]:
#songs = spark.read.parquet(output_bucket)

In [None]:
#songs.columns

In [None]:
## write songs table to parquet files partitioned by year and artist
## DO NOT RUN LAST LINE IF SONGS DIRECTORY IS ALREADY CREATED !!
#output = 'datalake_output/'
#output_songs = os.path.join(output, 'songs')
#songs_table.write.partitionBy('year', 'artist_id').parquet(output_songs)

In [13]:
# read the songs table by columns from parquet files in S3
# The result of loading a Parquet file is also a DataFrame
songs = spark.read.parquet(output_songs)
songs.columns

['song_id', 'title', 'duration', 'year', 'artist_id']

In [14]:
songs.filter(songs["year"] > 2000).show(5, truncate=False)

+------------------+---------------------------------------------+---------+----+------------------+
|song_id           |title                                        |duration |year|artist_id         |
+------------------+---------------------------------------------+---------+----+------------------+
|SOEKAZG12AB018837E|I'll Slap Your Face (Entertainment USA Theme)|129.85424|2001|ARSVTNL1187B992A91|
|SORRNOC12AB017F52B|The Last Beat Of My Heart (b-side)           |337.81506|2004|ARSZ7L31187FB4E610|
|SOQPWCR12A6D4FB2A3|A Poor Recipe For Civic Cohesion             |118.07302|2005|AR73AIO1187B9AD57B|
|SOERIDA12A6D4F8506|I Want You (Album Version)                   |192.28689|2006|ARBZIN01187FB362CC|
|SONRWUU12AF72A4283|Into The Nightlife                           |240.63955|2008|ARGE7G11187FB37E05|
+------------------+---------------------------------------------+---------+----+------------------+
only showing top 5 rows



In [15]:
## write artists_table2 dataframe to parquet files which creates directory 'artists'
### DO NOT RUN WRITE COMMAND IF ARTISTS TABLE EXISTS
#output_artists = os.path.join( output, 'artists')
output_bucket = 's3a://datalake-bucket2019/'
output_artists = os.path.join(output_bucket, 'artists')
#artists_table.write.parquet(output_artists)

In [16]:
# Read 'artists' parquet files and convert it back to dataframe
artists = spark.read.parquet(output_artists)
artists.columns

['artist_id', 'name', 'location', 'latitude', 'longitude']

In [None]:
# filter artists table where latitude is greater than 50 
#artists.filter(artists["latitude"] > 50).show(5, truncate=False)

In [None]:
#### WRITE USERS, TIME, AND SONGPLAYS TABLES TO PARQUET FILES FROM LOG DATA FILE

In [17]:
## get filepath to log data file from s3 bucket
input_data = 's3a://udacity-dend/'
log_data = os.path.join( input_data, "log_data/*/*/*.json")  

In [18]:
## read the log data file in json format  
df_logsets = spark.read.json(log_data)

In [19]:
# show dataframe with columns
df_logsets

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [20]:
# show data in df2 spark dataframe
df_logsets.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [None]:
#df2.limit(5).toPandas()

In [21]:
# Filter log dataframe for associated with song plays i.e. records with page NextSong
df2_nextsong = df_logsets.filter(df_logsets["page"] == 'NextSong')
df2_nextsong.show(1)

+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|  artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page|     registration|sessionId|         song|status|           ts|           userAgent|userId|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+--------------------+------+
|Harmonia|Logged In|     Ryan|     M|            0|   Smith|655.77751| free|San Jose-Sunnyval...|   PUT|NextSong|1.541016707796E12|      583|Sehr kosmisch|   200|1542241826796|"Mozilla/5.0 (X11...|    26|
+--------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-----------------+---------+-------------+------+-------------+----

In [22]:
# Convert df2_nextsongs to Panda dataframe for table display
df2_nextsong.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26


In [23]:
# Extract columns for users table from df2_nextsong spark dataframe
users_table = df2_nextsong['userId', 'firstName', 'lastName', 'gender', 'level']
users_table

DataFrame[userId: string, firstName: string, lastName: string, gender: string, level: string]

In [24]:
# rename columns from users_table
users_table2 = (users_table.withColumnRenamed("userId", "user_id")
                               .withColumnRenamed("firstName", "first_name")
                               .withColumnRenamed("lastName", "last_name"))
                              
#users_table2.columns
users_table2 = users_table2.dropDuplicates(['user_id'])

In [25]:
users_table2.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


In [26]:
# join directories 'datalake_output' and 'users'
# write users table to parquet files in 'users' directory
#### DO NOT RUN LAST COMMAND IF USERS DIRECTORY ALREADY EXISTS
#output = 'datalake_output/'
#output_users = os.path.join(output, 'users')
output_bucket = 's3a://datalake-bucket2019/'
output_users = os.path.join(output_bucket, 'users')
#users_table2.write.parquet(output_users)

In [27]:
# create spark dataframe from parquet file in 'users' directory and show columns
users = spark.read.parquet(output_users)
users.show(5)

+-------+----------+---------+------+-----+
|user_id|first_name|last_name|gender|level|
+-------+----------+---------+------+-----+
|     88|  Mohammad|Rodriguez|     M| free|
|     75|    Joseph|Gutierrez|     M| free|
|     53|   Celeste| Williams|     F| free|
|     60|     Devin|   Larson|     M| free|
|     68|    Jordan|Rodriguez|     F| free|
+-------+----------+---------+------+-----+
only showing top 5 rows



In [28]:
df2_nextsong.head(1)
#df2_nextsong.limit(5).toPandas()

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26')]

In [29]:
df2_nextsong.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [30]:
# create 'df3_nextsong' dataframe with 'start_time' column 
get_timestamp = udf(lambda x: datetime.datetime.fromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
df3_nextsong = df2_nextsong.withColumn("start_time", get_timestamp(df2_nextsong['ts']))
df3_nextsong

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: double, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, start_time: string]

In [31]:
#import datetime 
df3_nextsong.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26')]

In [32]:
df3_nextsong.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26


In [33]:
# create 'df4_nextsong' dataframe with datetime column from original timestamp column which will be used to get 'weekday'
get_datetime = udf(lambda x: datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d'))
df4_nextsong = df3_nextsong.withColumn("date_time", get_datetime(df3_nextsong['ts']))

In [34]:
df4_nextsong.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26', date_time='2018-11-15')]

In [35]:
df4_nextsong=df4_nextsong.withColumn('hour', hour(df4_nextsong['start_time']))

In [36]:
df4_nextsong=df4_nextsong.withColumn('day', dayofmonth(df4_nextsong['start_time']))

In [37]:
df4_nextsong=df4_nextsong.withColumn('week', weekofyear(df4_nextsong['start_time']))

In [38]:
df4_nextsong=df4_nextsong.withColumn('month', month(df4_nextsong['start_time']))

In [39]:
df4_nextsong=df4_nextsong.withColumn('year', year(df4_nextsong['start_time']))

In [40]:
df4_nextsong=df4_nextsong.withColumn('weekday', date_format(df4_nextsong['date_time'],'u'))

In [41]:
df4_nextsong.head(1)

[Row(artist='Harmonia', auth='Logged In', firstName='Ryan', gender='M', itemInSession=0, lastName='Smith', length=655.77751, level='free', location='San Jose-Sunnyvale-Santa Clara, CA', method='PUT', page='NextSong', registration=1541016707796.0, sessionId=583, song='Sehr kosmisch', status=200, ts=1542241826796, userAgent='"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/36.0.1985.125 Chrome/36.0.1985.125 Safari/537.36"', userId='26', start_time='2018-11-15 00:30:26', date_time='2018-11-15', hour=0, day=15, week=46, month=11, year=2018, weekday='4')]

In [42]:
# Extract columns to create time table
time_table = df4_nextsong['start_time', 'hour', 'day', 'week', 'month','year','weekday']
time_table.head(1)

[Row(start_time='2018-11-15 00:30:26', hour=0, day=15, week=46, month=11, year=2018, weekday='4')]

In [43]:
# Write time table to parquet files partitioned by year and month 
## DO NOT RUN LAST LINE IF SONGS DIRECTORY IS ALREADY CREATED !!
#output = 'datalake_output/'
#output_time = os.path.join(output, 'time')
output_bucket = 's3a://datalake-bucket2019/'
output_time = os.path.join(output_bucket, 'time')
#time_table.write.partitionBy('year', 'month').parquet(output_time)

In [44]:
# create spark dataframe from parquet file in 'time' directory and show columns
time = spark.read.parquet(output_time)
time.show(5)

+-------------------+----+---+----+-------+----+-----+
|         start_time|hour|day|week|weekday|year|month|
+-------------------+----+---+----+-------+----+-----+
|2018-11-15 00:30:26|   0| 15|  46|      4|2018|   11|
|2018-11-15 00:41:21|   0| 15|  46|      4|2018|   11|
|2018-11-15 00:45:41|   0| 15|  46|      4|2018|   11|
|2018-11-15 03:44:09|   3| 15|  46|      4|2018|   11|
|2018-11-15 05:48:55|   5| 15|  46|      4|2018|   11|
+-------------------+----+---+----+-------+----+-----+
only showing top 5 rows



In [None]:
#### CREATE SONGPLAYS TABLE 

In [45]:
# Read in song data to use for songplays table 
# The result of loading a Parquet file is also a DataFrame
#song_df = df_songsets
song_df = df_songsets
#song_df.show(1)
song_df.limit(1).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARTC1LV1187B9A4858,51.4536,"Goldsmith's College, Lewisham, Lo",-0.01802,The Bonzo Dog Band,301.40036,1,SOAFBCP12A8C13CC7D,King Of Scurf (2007 Digital Remaster),1972


In [46]:
# rename four columns from log dataset dataframe
df3_nextsong = (df3_nextsong.withColumnRenamed("userId", "user_id")
                               .withColumnRenamed("sessionId", "session_id")
                               .withColumnRenamed("userAgent", "user_agent"))
df3_nextsong.limit(1).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,session_id,song,status,ts,user_agent,user_id,start_time
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26,2018-11-15 00:30:26


In [47]:
# Extract columns from joined song and log datasets to create songplays_table
# Use condition statement to join on artist name, song, and duration of song
cond = [df3_nextsong.artist == song_df.artist_name, df3_nextsong.song == song_df.title, df3_nextsong.length == song_df.duration]

In [48]:
# Use left outer join to create songplays_table
songplays_table = df3_nextsong.join(song_df,cond,how='left_outer').select(df3_nextsong.start_time,
df3_nextsong.user_id,df3_nextsong.level, song_df.song_id,
song_df.artist_id, df3_nextsong.session_id, df3_nextsong.location, df3_nextsong.user_agent)
songplays_table.limit(3).toPandas() 

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,2018-11-15 00:30:26,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,2018-11-15 00:41:21,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,2018-11-15 00:45:41,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [49]:
# create songplay_id column for songplays_table by using function monotonically increasing id function
from pyspark.sql import functions as F
songplays_table = songplays_table.withColumn("songplay_id", F.monotonically_increasing_id() + 1)
# create final songplays table
songplays_table = songplays_table.select("songplay_id","start_time","user_id","level","song_id","artist_id",
                                        "session_id","location","user_agent")
songplays_table.limit(3).toPandas()
#songplays_table

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent
0,1,2018-11-15 00:30:26,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
1,2,2018-11-15 00:41:21,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."
2,3,2018-11-15 00:45:41,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5..."


In [50]:
# Write time table to parquet files partitioned by year and month 
songplays_table=songplays_table.withColumn('year', year(songplays_table['start_time']))
songplays_table=songplays_table.withColumn('month', month(songplays_table['start_time']))
songplays_table.show(1)

+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          1|2018-11-15 00:30:26|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
only showing top 1 row



In [51]:
## DO NOT RUN LAST LINE IF SONGPLAYS DIRECTORY IS ALREADY CREATED !!
#output = 'datalake_output/'
output_bucket = 's3a://datalake-bucket2019/'
output_songplays = os.path.join(output_bucket, 'songplays')
#songplays_table.write.partitionBy('year', 'month').parquet(output_songplays)

In [52]:
# Read in songplays_table as dataframe
# The result of loading a Parquet file is also a DataFrame
songplays = spark.read.parquet(output_songplays)
songplays.show(5)

+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|songplay_id|         start_time|user_id|level|song_id|artist_id|session_id|            location|          user_agent|year|month|
+-----------+-------------------+-------+-----+-------+---------+----------+--------------------+--------------------+----+-----+
|          1|2018-11-15 00:30:26|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          2|2018-11-15 00:41:21|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          3|2018-11-15 00:45:41|     26| free|   null|     null|       583|San Jose-Sunnyval...|"Mozilla/5.0 (X11...|2018|   11|
|          4|2018-11-15 03:44:09|     61| free|   null|     null|       597|Houston-The Woodl...|"Mozilla/5.0 (Mac...|2018|   11|
|          5|2018-11-15 05:48:55|     80| paid|   null|     null|       602|Portland-South

In [55]:
songplays.filter(songplays["user_id"] == 26).limit(5).toPandas()

Unnamed: 0,songplay_id,start_time,user_id,level,song_id,artist_id,session_id,location,user_agent,year,month
0,1,2018-11-15 00:30:26,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
1,2,2018-11-15 00:41:21,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
2,3,2018-11-15 00:45:41,26,free,,,583,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
3,18,2018-11-15 07:08:36,26,free,,,607,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
4,19,2018-11-15 07:12:09,26,free,,,607,"San Jose-Sunnyvale-Santa Clara, CA","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",2018,11
