In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
from pyspark.sql.functions import *
import urllib

file_type = "csv"
first_row_is_header = "true"
delimiter = ","
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True) #Deletes the checkpoint folder so that the write command can be run again

def create_dataframe_from_stream_data(type_of_record):
    '''
    Creates a dataframe from the incoming streaming data and produces a dataframe in a json string format

    Parameters
    ----------
    type_of_record: str
        the type of record that is being added to the df e.g. "pin"

    Returns
    -------
    df
        the dataframe that has been created
    '''
    df = spark.readStream \
        .format("kinesis") \
        .option("streamName", f"streaming-0a4e65e909bd-{type_of_record}") \
        .option("region", "us-east-1") \
        .option("initialPosition", 'earliest') \
        .option("awsAccessKey", ACCESS_KEY) \
        .option("awsSecretKey", SECRET_KEY) \
        .load()
    df = df.selectExpr("CAST(data as STRING)")
    return df

def normalise_follower_count():
    '''
    Changes the follower count from a string, to an integer

    Parameters
    ----------
    None

    Returns
    -------
    None
    '''
    df = pin_df.withColumn("follower_count", when(
        col("follower_count").rlike("\\d+k"), #Checks if the value matches a pattern of one or more digits plus the letter k (e.g. 12k)
        (regexp_extract(col("follower_count"), "(\\d+)", 1).cast("integer") * 1000) 
    ).otherwise(col("follower_count").cast("integer"))) 
    return df

def create_delta_table(df, type_of_record):
    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table(f"0a4e65e909bd_{type_of_record}_table")

schema = StructType([ #Gives the structure of the df for the table to be laid out 
    StructField("index",StringType(),True), 
    StructField("unique_id",StringType(),True), 
    StructField("title",StringType(),True), 
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
  ])

pin_df = create_dataframe_from_stream_data("pin")

#Assembles data into df with separate columns
pin_df = pin_df.withColumn("jsonData",from_json(col("data"),schema)) \
                   .select("jsonData.*")

pin_df = normalise_follower_count()
pin_df = pin_df.withColumn("save_location", regexp_extract(col("save_location"), "(/data/).*", 0)) #Extracts the save location of the column 
pin_df = pin_df.withColumnRenamed("index", "ind") #Renames the index column to ind to match the geo and user dfs
# pin_df = pin_df.withColumn("ind",col("ind").cast("integer")) #Ensures ind column is an integer

column_structure = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
pin_df = pin_df.select(column_structure) #Rstructures the column to the order in column_structure

display(pin_df)
create_delta_table(pin_df, "pin")


ind,unique_id,title,description,follower_count,poster_name,tag_list,is_image_or_video,image_src,save_location,category
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
2386,38c3e021-527e-4da6-8d43-c0aa07500d12,Christmas Five Senses Book,"The sights, sounds and smells of Christmas stay with many of us for a lifetime. Perhaps cinnamon in your nasal cavity automatically takes you back to Grandma’s kitchen. Or at th…",79000.0,Life Over C's,"Christmas Books,Christmas Themes,Kids Christmas,Xmas,Christmas Projects,Preschool Christmas Crafts,Christmas Activities,Kindergarten Christmas,Winter Activities",2386,2386,/data/christmas,christmas
7528,fbe53c66-3442-4773-b19e-d3ec6f54dddf,No Title Data Available,No description available Story format,,User Info Error,"N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",7528,7528,/data/mens-fashion,mens-fashion
2863,9bf39437-42a6-4f02-99a0-9a0383d8cd70,25 Super Fun Summer Crafts for Kids - Of Life and Lisa,Keep the kids busy this summer with these easy diy crafts and projects. Creative and…,124000.0,Of Life & Lisa | Lifestyle Blog,"Summer Crafts For Kids,Fun Crafts For Kids,Summer Kids,Toddler Crafts,Crafts To Do,Diy For Kids,Summer Snow,Diys For Summer,Craft Ideas For Girls",2863,2863,/data/diy-and-crafts,diy-and-crafts
5730,1e1f0c8b-9fcf-460b-9154-c775827206eb,Island Oasis Coupon Organizer,"Description Coupon Organizer in a fun colorful fabric -island oasis, Great Size for the ""basic"" couponer - holds up to 500 coupons with ease, and is made long enough so that you…",0.0,Consuelo Aguirre,"Grocery Items,Grocery Coupons,Care Organization,Coupon Organization,Extreme Couponing,Couponing 101,Life Binder,Save My Money,Love Coupons",5730,5730,/data/finance,finance
8304,5b6d0913-25e4-43ab-839d-85d5516f78a4,The #1 Reason You’re Not His Priority Anymore - Matthew Coast,#lovequotes #matchmaker #matchmadeinheaven #loveyourself #respectyourself,51000.0,Commitment Connection,"Wise Quotes,Quotable Quotes,Words Quotes,Wise Words,Quotes To Live By,Great Quotes,Motivational Quotes,Inspirational Quotes,Funny Quotes",8304,8304,/data/quotes,quotes


In [0]:
from pyspark.sql.types import StructType,StructField, StringType

schema = StructType([ 
    StructField("ind",StringType(),True), 
    StructField("latitude",StringType(),True), 
    StructField("longitude",StringType(),True), 
    StructField("country", StringType(), True),
    StructField("timestamp", StringType(), True)
  ])

geo_df = create_dataframe_from_stream_data("geo")
geo_df = geo_df.withColumn("jsonData",from_json(col("data"),schema)) \
                   .select("jsonData.*")

geo_df = geo_df.withColumn("coordinates", array(col("longitude"), col("latitude")))
geo_df = geo_df.withColumn("timestamp", col("timestamp").cast("timestamp"))
column_structure = ["ind", "country", "coordinates", "timestamp"]
geo_df = geo_df.select(column_structure)

display(geo_df)
create_delta_table(geo_df, "geo")

ind,country,coordinates,timestamp
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000
2386,Montenegro,"List(-153.293, 21.6023)",2019-02-22T20:21:06.000+0000


In [0]:
schema = StructType([ 
    StructField("ind",StringType(),True), 
    StructField("first_name",StringType(),True), 
    StructField("last_name",StringType(),True), 
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
  ])

user_df = create_dataframe_from_stream_data("user")
user_df = user_df.withColumn("jsonData",from_json(col("data"),schema)) \
                   .select("jsonData.*")

user_df = user_df.withColumn("user_name", concat(col("first_name"),col("last_name")))
user_df = user_df.drop("first_name", "last_name")
user_df = user_df.withColumn("date_joined", col("date_joined").cast("timestamp"))

column_structure = ["ind", "user_name", "age", "date_joined"]
user_df = user_df.select(column_structure)

display(user_df)
create_delta_table(user_df, "user")

ind,user_name,age,date_joined
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
2386,MichelleLloyd,25,2017-06-09T01:37:22.000+0000
