In [4]:
import os
os.environ["SPARK_HOME"] = "/home/osboxes/spark/spark-2.4.0-bin-hadoop2.7/"

import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, levenshtein. to_timestamp
import org.apache.spark.sql.functions.date_format

conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
#conf = pyspark.SparkConf().setAppName('appName').setMaster('spark://192.168.11.128:8080')

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

#spark.enableHiveSupport()

In [88]:
from pyspark.sql.types import *
def set_Schema(cols):
    simpleSchema = []
    
    for i in cols: 
        simpleSchema.append(StructField(i[0],i[1])) 
        
    return StructType(fields=simpleSchema)

In [98]:
from pyspark.sql.functions import col, split, levenshtein, to_timestamp, date_format

def persistToStagingDB(csv_file, table_name, table_schema):
    
    df = spark.read.csv(csv_file, header=True,encoding="UTF-8", quote='"',escape='"', schema =table_schema) 
    #Other options that can come in handy during csv import are:
    #multiLine=True,ignoreLeadingWhiteSpace=True,ignoreTrailingWhiteSpace=True,encoding="UTF-8",sep=',',maxColumns=2,inferSchema=True
    
    if table_name == 'spotify_top_200_weekly':

        #split date columns into start and end
        df = df.withColumn("start_date", split(col("date_range"), "--").getItem(0)).withColumn("end_date", split(col("date_range"), "--").getItem(1))
        df = df.withColumn("start_date", df["start_date"].cast(DateType()))
        df = df.withColumn("end_date", df["end_date"].cast(DateType()))
   
    elif table_name == 'songkick_uk_events':
        df = df.withColumn("start_date",to_date("start_date","d/M/y"))        
        df = df.withColumn("start_time",to_timestamp("start_time", "H:mm"))

    #Persit to Postgres DB - available after session closes
    try:
        df.write \
        .jdbc("jdbc:postgresql://osboxes:5432/gig_stg", table_name,
              properties={"user": "postgres", "password": "postgres"})
        return True
    except:
        return False
        

In [83]:
help(to_timestamp)

Help on function to_timestamp in module pyspark.sql.functions:

to_timestamp(col, format=None)
    Converts a :class:`Column` of :class:`pyspark.sql.types.StringType` or
    :class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`
    using the optionally specified format. Specify formats according to
    `SimpleDateFormats <http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html>`_.
    By default, it follows casting rules to :class:`pyspark.sql.types.TimestampType` if the format
    is omitted (equivalent to ``col.cast("timestamp")``).
    
    >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
    >>> df.select(to_timestamp(df.t).alias('dt')).collect()
    [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
    
    >>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
    >>> df.select(to_timestamp(df.t, 'yyyy-MM-dd HH:mm:ss').alias('dt')).collect()
    [Row(dt=datetime.datetime(1997, 2, 28, 10, 30))]
    
    .. ver

In [13]:
#remember to always change the column name to lowercase
# source: https://stackoverflow.com/questions/56145841/pandas-adding-double-quotes-on-column-names
table_name = 'spotify_top_200_weekly'
columns = [('week_position',IntegerType()),
           ('track_name',StringType()), 
           ('artist', StringType()), 
           ('streams', FloatType()), 
           ('date_range', StringType()), 
           ('region', StringType()), 
           ('spotify_id', StringType())]
simple_struct = set_Schema(columns)

csv_file = '../datasets/spotify_top_200_weekly.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

True

In [14]:
#remember to always change the column name to lowercase
# source: https://stackoverflow.com/questions/56145841/pandas-adding-double-quotes-on-column-names

table_name = 'spotify_track_details'
columns = [('track_spotify_id',StringType()),
           ('artist_spotify_id',StringType()), 
           ('artist_mbid', StringType()), 
           ('artist_name', StringType()), 
           ('track_url', StringType()), 
           ('track_popularity', FloatType()), 
           ('track_duration_ms', IntegerType()),
           ('track_is_local', BooleanType()), 
           ('album_id', StringType()), 
           ('album_track_number', IntegerType()), 
           ('album_release_date', DateType()), 
           ('album_type', StringType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/spotify_track_details.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

True

In [29]:
table_name = 'songkick_uk_venues'
columns = [('venue_id', IntegerType()),
           ('name', StringType()),
           ('street', StringType()),
           ('post_code', StringType()),
           ('city', StringType()),
           ('country', StringType()),
           ('capacity', FloatType()),
           ('website', StringType()),
           ('phone', StringType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/ukVenues.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

False

In [99]:
#TimestampType
table_name = 'songkick_uk_events'
columns = [('event_id', IntegerType()),
           ('event_type', StringType()),
           ('uri', StringType()),
           ('age_restriction', StringType()),
           ('mbid', StringType()),
           ('venue_id', IntegerType()),
           ('start_date', StringType()),
           ('start_time', StringType()),
           ('country', StringType()),
           ('flagged_as_ended', BooleanType())
          ]
simple_struct = set_Schema(columns)
csv_file = '../datasets/ukEvents.csv'

persistToStagingDB(csv_file, table_name, simple_struct)

True

In [21]:
help(pyspark.sql.types)

Help on module pyspark.sql.types in pyspark.sql:

NAME
    pyspark.sql.types

DESCRIPTION
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    #

CLASSES
    builtins.object
        DataType
            ArrayType


## Option to add to spark.read.csv to ensure string in quotes and commas are treated as one

In [None]:

#df_weeklyTop200 = spark.read.csv('../datasets/spotify_top_200_weekly.csv', header=True, nullValue='', schema=simple_struct)



In [None]:
df_weeklyTop200.filter(df_weeklyTop200.spotify_id == '5mZXWEH2eh8zMZGCxT5aW0').show(vertical=True, truncate=False)

## Persist dataframe to temporary and permanent storage

In [None]:
#Persist temporarily, not available when session closes
df_weeklyTop200.createOrReplaceTempView('spotify_top_200_weekly')
spark.sql('select count(1) from spotify_top_200_weekly').show()


In [None]:
#Persist to HIVE Metastore - this is available after the session closes using spark.read.parquet('spark-warehouse/weeklytop200/')
df_weeklyTop200.write.mode('Overwrite').saveAsTable('spotify_top_200_weekly')

In [None]:
spark.read.parquet('spark-warehouse/spotify_top_200_weekly/').count()