In [1]:
!pip install path

Collecting path
  Downloading https://files.pythonhosted.org/packages/3a/33/cf93f8875e7b79f2bd5fe3c490952f4a53a8952967533e4f6af16c30455e/path-16.2.0-py3-none-any.whl
Installing collected packages: path
Successfully installed path-16.2.0


In [1]:
import configparser
from datetime import datetime
import os
from path import Path
import glob
import inspect
from typing import List

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T

import pandas as pd
pd.set_option('display.max_columns', 200)

In [None]:
!unzip data/song-data.zip -d data/song-data/
!unzip data/log-data.zip -d data/log-data/
!mkdir data/schema

# Getting AWS kyes

In [2]:
# initializing configparser
config = configparser.ConfigParser()

# Normally this file should be in ~/.aws/credentials
if Path('~/.aws/credentials/dl.cfg').exists():
    config.read_file(open(Path('~/.aws/credentials/dl.cfg')))
    songPath = 's3a://song_data/*/*/*/*.json'
    logPath = 's3a://log_data/*.json'
    # define output paths
    output_data = 's3a://schema/'

else:
    config.read_file(open(Path('dl.cfg')))
    songPath = 'data/song-data/**/*.json'
    logPath = 'data/log-data/*.json'
    # define output paths
    output_data = 'data/schema/'

os.environ["AWS_ACCESS_KEY_ID"] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"] = config['AWS']['AWS_SECRET_ACCESS_KEY']

# Create Spark Session

In [3]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.5") \
        .getOrCreate()
    return spark

spark = create_spark_session()

In [4]:
def extract_cols(col_names: List,
                 drop_dup_cols: List,
                 df,
                 partitionBy: List = None,
                 table_name: str = '',
                 if_use_select_expr: bool = False):
    """
        It extracts columns from dataframe df
        args:
         col_names:List          :   List of columns that are needed to be extracted
         drop_dup_cols:List      :   List of columns that are needed to be unique
         df                      :   Data frame
         partitionBy:List=None   :   List of columns by which the table is partioned
         table_name:str          :   Name of the output table that needed to be saved in the paraquet format
         if_use_select_expr      :   If true it will run selectExpr command otherwise select command
    """
    # extract columns to create table
    if if_use_select_expr:
        table = df.selectExpr(*col_names).dropDuplicates(drop_dup_cols)

    else:
        table = df.select(*col_names).dropDuplicates(drop_dup_cols)
    if partitionBy is not None:
        # write table to parquet files partitioned by partioinBy column list
        table.write.parquet(Path(output_data) / Path(table_name + ".parquet"),
                            partitionBy=partitionBy,
                            mode="overwrite")
    else:
        # write table to parquet files
        table.write.parquet(Path(output_data) / Path(table_name + ".parquet"),
                            mode="overwrite")
    return table
    

# Process song data

## Song Table

In [5]:
from pyspark.sql.types import StructType, StructField, DoubleType , StringType, IntegerType, LongType, TimestampType  
song_schema = StructType([
        StructField(  "artist_id", StringType()), 
        StructField(  "artist_latitude", DoubleType()), 
        StructField(  "artist_location", StringType()), 
        StructField(  "artist_longitude", DoubleType()), 
        StructField(  "artist_name", StringType()), 
        StructField(  "duration", DoubleType()),
        StructField(  "num_songs", IntegerType()), 
        StructField(  "song_id", StringType()), 
        StructField(  "title", StringType()), 
        StructField(  "year", IntegerType())
    ])

In [6]:
# Read in the song data data/song-data/song_data/A/A/A/TRAAAAW128F429D538.json
song_data = glob.glob(songPath, recursive=True)
song_df = spark.read.json(song_data, song_schema)

In [7]:
# check the schema
song_df.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: integer (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)



In [8]:
song_df.limit(5).toPandas()

Unnamed: 0,artist_id,artist_latitude,artist_location,artist_longitude,artist_name,duration,num_songs,song_id,title,year
0,ARDR4AC1187FB371A1,,,,Montserrat Caballé;Placido Domingo;Vicente Sar...,511.16363,1,SOBAYLL12A8C138AF9,Sono andati? Fingevo di dormire,0
1,AREBBGV1187FB523D2,,"Houston, TX",,Mike Jones (Featuring CJ_ Mello & Lil' Bran),173.66159,1,SOOLYAZ12A6701F4A6,Laws Patrolling (Album Version),0
2,ARMAC4T1187FB3FA4C,40.82624,"Morris Plains, NJ",-74.47995,The Dillinger Escape Plan,207.77751,1,SOBBUGU12A8C13E95D,Setting Fire to Sleeping Giants,2004
3,ARPBNLO1187FB3D52F,40.71455,"New York, NY",-74.00712,Tiny Tim,43.36281,1,SOAOIBZ12AB01815BE,I Hold Your Hand In Mine [Live At Royal Albert...,2000
4,ARDNS031187B9924F0,32.67828,Georgia,-83.22295,Tim Wilson,186.48771,1,SONYPOM12A8C13B2D7,I Think My Wife Is Running Around On Me (Taco ...,2005


In [9]:
song_cols = ['song_id', 'title', 'artist_id', 'year', 'duration']

In [10]:
songs_table = extract_cols(col_names = song_cols, 
                           drop_dup_cols = ["song_id"], 
                           df = song_df, 
                           partitionBy = ["year", "artist_id"],
                           table_name = 'songs')

In [11]:
songs_table.limit(5).toPandas()

Unnamed: 0,song_id,title,artist_id,year,duration
0,SOGOSOV12AF72A285E,¿Dónde va Chichi?,ARGUVEV1187B98BA17,1997,313.12934
1,SOMZWCG12A8C13C480,I Didn't Mean To,ARD7TVE1187B99BFB1,0,218.93179
2,SOUPIRU12A6D4FA1E1,Der Kleine Dompfaff,ARJIE2Y1187B994AB7,0,152.92036
3,SOXVLOJ12AB0189215,Amor De Cabaret,ARKRRTF1187B9984DA,0,177.47546
4,SOWTBJW12AC468AC6E,Broken-Down Merry-Go-Round,ARQGYP71187FB44566,0,151.84934


## Artist Table

In [12]:
# artist_id, name, location, lattitude, longitude
artist_cols = ["artist_id", 
               "artist_name as name", 
               "artist_location as location", 
               "artist_latitude as latitude", 
               "artist_longitude as longitude"]

In [13]:
# extract columns to create artists table
artists_table = extract_cols(col_names = artist_cols, 
                           drop_dup_cols = ["artist_id"], 
                           df = song_df, 
                           partitionBy = None,
                           table_name = 'artists',
                           if_use_select_expr = True)


In [14]:
artists_table.printSchema()

root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [15]:
artists_table.limit(5).toPandas()

Unnamed: 0,artist_id,name,location,latitude,longitude
0,AR9AWNF1187B9AB0B4,Kenny G featuring Daryl Hall,"Seattle, Washington USA",,
1,AR0IAWL1187B9A96D0,Danilo Perez,Panama,8.4177,-80.11278
2,AR0RCMP1187FB3F427,Billie Jo Spears,"Beaumont, TX",30.08615,-94.10158
3,AREDL271187FB40F44,Soul Mekanik,,,
4,ARI3BMM1187FB4255E,Alice Stuart,Washington,38.8991,-77.029


# Process Log Data

In [16]:
input_data = 'data/'
log_data_paths = glob.glob(Path(input_data) / Path("log-data/*.json"))
print(log_data_paths)

['data/log-data/2018-11-30-events.json', 'data/log-data/2018-11-11-events.json', 'data/log-data/2018-11-23-events.json', 'data/log-data/2018-11-24-events.json', 'data/log-data/2018-11-25-events.json', 'data/log-data/2018-11-20-events.json', 'data/log-data/2018-11-15-events.json', 'data/log-data/2018-11-01-events.json', 'data/log-data/2018-11-03-events.json', 'data/log-data/2018-11-13-events.json', 'data/log-data/2018-11-10-events.json', 'data/log-data/2018-11-28-events.json', 'data/log-data/2018-11-26-events.json', 'data/log-data/2018-11-04-events.json', 'data/log-data/2018-11-14-events.json', 'data/log-data/2018-11-27-events.json', 'data/log-data/2018-11-21-events.json', 'data/log-data/2018-11-06-events.json', 'data/log-data/2018-11-22-events.json', 'data/log-data/2018-11-12-events.json', 'data/log-data/2018-11-05-events.json', 'data/log-data/2018-11-09-events.json', 'data/log-data/2018-11-08-events.json', 'data/log-data/2018-11-02-events.json', 'data/log-data/2018-11-18-events.json',

In [17]:
from pyspark.sql.types import DoubleType, LongType, TimestampType 

In [28]:
df_log = spark.read.json(log_data_paths)


In [29]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [26]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,,Logged In,Wyatt,M,0,Scott,,free,"Eureka-Arcata-Fortuna, CA",GET,Home,1540872000000.0,563,,200,1542247071796,Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7....,9
4,,Logged In,Austin,M,0,Rosales,,free,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1541060000000.0,521,,200,1542252577796,Mozilla/5.0 (Windows NT 6.1; rv:31.0) Gecko/20...,12


In [30]:
log_schema = StructType([
         StructField( "artist" , StringType()),  
         StructField( "auth" , StringType()),  
         StructField( "firstName" , StringType()),  
         StructField( "gender" , StringType()),  
         StructField( "itemInSession" , IntegerType()),  
         StructField( "lastName" , StringType()),  
         StructField( "length" , DoubleType()),  
         StructField( "level" , StringType()),  
         StructField( "location" , StringType()),  
         StructField( "method" , StringType()),  
         StructField( "page" , StringType()),  
         StructField( "registration" , DoubleType()),  
         StructField( "sessionId" , IntegerType()),  
         StructField( "song" , StringType()),  
         StructField( "status" , IntegerType()),  
         StructField( "ts" , LongType()),  
         StructField( "userAgent" , StringType()),  
         StructField( "userId" , StringType())
    ])

In [31]:
# read log data file
df_log = spark.read.json(log_data_paths, log_schema)
df_log.printSchema()

# filter by actions for song plays
df_log = df_log.filter(df_log.page == "NextSong")

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [32]:
df_log.limit(5).toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Harmonia,Logged In,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Sehr kosmisch,200,1542241826796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
1,The Prodigy,Logged In,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,The Big Gundown,200,1542242481796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
2,Train,Logged In,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",PUT,NextSong,1541017000000.0,583,Marry Me,200,1542242741796,"""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",26
3,Sony Wonder,Logged In,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",PUT,NextSong,1540493000000.0,597,Blackbird,200,1542253449796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",61
4,Van Halen,Logged In,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",PUT,NextSong,1540794000000.0,602,Best Of Both Worlds (Remastered Album Version),200,1542260935796,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",80


## Users on log data

In [33]:
# user_id, first_name, last_name, gender, level
users_cols_log = ["userId as user_id", 
                  "firstName as first_name",  
                  "lastName as last_name", 
                  "gender", 
                  "level"]
users_table = extract_cols(col_names = users_cols_log, 
                           drop_dup_cols = ["user_id"], 
                           df = df_log, 
                           partitionBy = None, 
                           table_name = 'users',
                           if_use_select_expr = True)

In [34]:
users_table.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)



In [35]:
users_table.limit(5).toPandas()

Unnamed: 0,user_id,first_name,last_name,gender,level
0,51,Maia,Burke,F,free
1,7,Adelyn,Jordan,F,free
2,15,Lily,Koch,F,paid
3,54,Kaleb,Cook,M,free
4,101,Jayden,Fox,M,free


## Time table

In [36]:
# create timestamp column from original timestamp column
get_timestamp = udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))

In [37]:
df_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- song: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [38]:
# start_time, hour, day, week, month, year, weekday
time_cols_log = ["timestamp as start_time", 
                               "HOUR(timestamp) as hour",  
                               "DAY(timestamp) as day",  
                               "WEEKofYEAR(timestamp) as week",  
                               "MONTH(timestamp) as month",  
                               "YEAR(timestamp) as year",  
                               "DAYofWEEK(timestamp) as weekday"]

In [39]:
time_table = extract_cols(col_names = time_cols_log, 
                           drop_dup_cols = ["start_time"], 
                           df = df_log, 
                           partitionBy = ["year", "month"], 
                           table_name = 'time_table',
                           if_use_select_expr = True)

In [40]:
time_table.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [41]:
time_table.limit(5).toPandas()

Unnamed: 0,start_time,hour,day,week,month,year,weekday
0,2018-11-21 06:18:12.796,6,21,47,11,2018,4
1,2018-11-14 15:20:15.796,15,14,46,11,2018,4
2,2018-11-05 16:31:59.796,16,5,45,11,2018,2
3,2018-11-13 18:00:26.796,18,13,46,11,2018,3
4,2018-11-30 04:32:02.796,4,30,48,11,2018,6


In [42]:
def process_song_data(spark, input_data, output_data):
    """
        This procedure extracts metadata of songs stored in S3 in JSON format.
        It prepares data frames for artists and songs table from this dataset.
        It saves these tables back to S3 in Parquet file format.
        Songs table is partitioned by year of song and artist ID while storing as Parquet file.
        args:
         spark             :    instance of Spark context
         input_data        :    file path of directory where datasets for songs is stored
         output_data       :    file path of directory where the dimensions tables are to be stored as Parquet format
    """
    # get filepath to song data file
    songPath = Path(input_data) / Path("song-data/**/*.json")
    song_data = glob.glob(songPath, recursive=True)

    # read song data file
    df = spark.read.json(song_data)

    # songs table columns
    song_cols = ['song_id',
                 'title',
                 'artist_id',
                 'year',
                 'duration']

    # songs table
    songs_table = extract_cols(col_names=song_cols,
                                             drop_dup_cols=["song_id"],
                                             df=df,
                                             partitionBy=["year", "artist_id"],
                                             table_name='songs')
    # artists table columns
    artist_cols = ["artist_id",
                   "artist_name as name",
                   "artist_location as location",
                   "artist_latitude as lattitude",
                   "artist_longitude as longitude"]
    # artists table
    artists_table = extract_cols(col_names=artist_cols,
                                 drop_dup_cols=["artist_id"],
                                 df=df,
                                 partitionBy=None,
                                 table_name='artists',
                                 if_use_select_expr=True)

In [43]:
process_song_data(spark, input_data='data/', output_data=output_data)

In [44]:
def process_log_data(spark, input_data, output_data):
    """
    Load JSON input data (log_data) from input_data path,
    process the data to extract users_table, time_table,
    songplays_table, and store the queried data to parquet files.
    args:
     spark             :    instance of Spark context
     input_data        :    file path of directory where datasets for user activity log is stored
     output_data       :    file path of directory where the dimensions tables are to be stored as Parquet format

    """

    # get filepath to log data file
    log_path = Path(input_data) / Path("log-data/*.json")
    log_data = glob.glob(log_path, recursive=True)

    # read log data file
    df_log = spark.read.json(log_data)

    # filter by actions for song plays
    df_log = df_log.filter(df_log.page == "NextSong")

    # columns for user table
    users_cols_log = ["userId as user_id",
                      "firstName as first_name",
                      "lastName as last_name",
                      "gender",
                      "level"]
    # users table
    users_table = extract_cols(col_names=users_cols_log,
                               drop_dup_cols=["user_id"],
                               df=df_log,
                               partitionBy=None,
                               table_name='users',
                               if_use_select_expr=True)

    # create timestamp column from original timestamp column
    get_timestamp = udf(
        lambda x: datetime.fromtimestamp(
            (x / 1000.0)),
        T.TimestampType())
    # adding timestamp column to df_log
    df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))

    # columns for time table
    time_cols_log = ["timestamp as start_time",
                     "HOUR(timestamp) as hour",
                     "DAY(timestamp) as day",
                     "WEEKofYEAR(timestamp) as week",
                     "MONTH(timestamp) as month",
                     "YEAR(timestamp) as year",
                     "DAYofWEEK(timestamp) as weekday"]

    # time table
    time_table = extract_cols(col_names=time_cols_log,
                              drop_dup_cols=["start_time"],
                              df=df_log,
                              partitionBy=["year", "month"],
                              table_name='time_table',
                              if_use_select_expr=True)

    # read in song data to use for songplays table
    song_df = spark.read.json(song_data)
    # create temporary views to use them in the join
    song_df.createOrReplaceTempView("songs")
    df_log.createOrReplaceTempView("logs")

    # extract columns from joined song and log datasets to create songplays
    # table
    songplays_table = spark.sql("""
                                        SELECT l.timestamp AS start_time,
                                               l.userId AS user_id,
                                               l.level AS level,
                                               s.song_id AS song_id,
                                               s.artist_id AS artist_id,
                                               l.sessionId AS session_id,
                                               l.location AS locatioin,
                                               l.userAgent AS user_agent
                                        FROM logs l
                                        JOIN songs s ON (l.song=s.title
                                                         AND l.length=s.duration
                                                         AND l.artist=s.artist_name)
                                    """).withColumn("songplay_id", F.monotonically_increasing_id())

    # write songplays table to parquet files partitioned by year and month
    songplays_table.write.parquet(
        Path(output_data) /
        Path("songplays.parquet"),
        partitionBy=None,
        mode="overwrite")

In [45]:
process_log_data(spark = spark, input_data = 'data/', output_data=output_data )

# Join tables

In [46]:
# create temporary views to use them in the join
song_df.createOrReplaceTempView("songs")
df_log.createOrReplaceTempView("logs")
# extract columns from joined song and log datasets to create songplays table 
# songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
songplays_table = spark.sql("""
                                    SELECT l.timestamp AS start_time,
                                           l.userId AS user_id,
                                           l.level AS level,
                                           s.song_id AS song_id,
                                           s.artist_id AS artist_id,
                                           l.sessionId AS session_id,
                                           l.location AS locatioin,
                                           l.userAgent AS user_agent
                                    FROM logs l
                                    JOIN songs s ON (l.song=s.title
                                                     AND l.length=s.duration
                                                     AND l.artist=s.artist_name)
                                """).withColumn("songplay_id", F.monotonically_increasing_id())
# .withColumn("songplay_id", F.monotonically_increasing_id())
# write songplays table to parquet files partitioned by year and month
songplays_table.write.parquet(Path(output_data) / Path("songplays.parquet"), 
                        partitionBy=None, 
                        mode="overwrite")

In [47]:
songplays_table.limit(5).toPandas()

Unnamed: 0,start_time,user_id,level,song_id,artist_id,session_id,locatioin,user_agent,songplay_id
0,2018-11-21 21:56:47.796,15,paid,SOZCTXZ12AB0182364,AR5KOSW1187FB35FF4,818,"Chicago-Naperville-Elgin, IL-IN-WI","""Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5...",0


In [44]:

def retrieve_name(var):
    callers_local_vars = inspect.currentframe().f_back.f_locals.items()
    return [var_name for var_name, var_val in callers_local_vars if var_val is var]



In [56]:
from prettytable import PrettyTable
table_names = [songs_table ,artists_table, users_table,time_table,songplays_table]
with open('temp_log_table_output.txt','w') as f:
    for table_name in table_names:
        temp = retrieve_name(table_name)
        if len(temp)>2:
            temp = temp[1]
        else:
            temp = temp[0]
        f.write(f'\n{temp}: \n')
#         table_name.createOrReplaceGlobalTempView(temp)
        df_temp = table_name.limit(5).toPandas()
        table = PrettyTable(list(df_temp.columns))
        
        for _, row in df_temp.iterrows():
            table.add_row([*row])
        table = str(table).replace('+','|')
        f.write(table)
        
#         f.write(df_temp.to_markdown(tablefmt="grid"))

In [62]:
for t in table_names:
    temp = retrieve_name(t)
    if len(temp)>2:
        temp = temp[1]
    else:
        temp = temp[0]
    name = temp
    print(f'Table {name}' + '{')
    t.printSchema()
    print('}')

Table songs_table{
root
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- artist_id: string (nullable = true)
 |-- year: long (nullable = true)
 |-- duration: double (nullable = true)

}
Table artists_table{
root
 |-- artist_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- location: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

}
Table users_table{
root
 |-- user_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- level: string (nullable = true)

}
Table time_table{
root
 |-- start_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)

}
Table table_name{
root
 |-- start_t

In [57]:
spark.sql('SHOW TABLES').collect()

[Row(database='', tableName='logs', isTemporary=True),
 Row(database='', tableName='songs', isTemporary=True)]