In [0]:
#Initial Setup

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# pyspark functions
from pyspark.sql.types import *
from pyspark.sql.functions import *
# URL processing
import urllib

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
#    !! Important !!

#if runing the writestream function again, we first have to delete the checkpoint folder
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)

In [0]:
#df_pin

#MAIN STEP - 1
#-----------------
#dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)


df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0adad64f7925-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
#MAIN STEP - 2
#-----------------

df_pin = df_pin.selectExpr("CAST(data as STRING)")

df_pin_schema = StructType([StructField("category", StringType()),
                       StructField("description", StringType()),
                       StructField("downloaded", IntegerType()),
                       StructField("follower_count", StringType()),
                       StructField("image_src", StringType()),
                       StructField("index", IntegerType()),
                       StructField("is_image_or_video", StringType()),
                       StructField("poster_name", StringType()),
                       StructField("save_location", StringType()),
                       StructField("tag_list", StringType()),
                       StructField("title", StringType()),
                       StructField("unique_id", StringType())
                      ])
#

df_pin = df_pin.withColumn("data", from_json(df_pin["data"], df_pin_schema)).select(col("data.*"))

In [0]:
display(df_pin)

In [0]:
#MAIN STEP - 3
#-----------------

#Transforming
#######################

#Clean follower_count
df_pin = df_pin.withColumn("follower_count_correct", 
                            when(col("follower_count").endswith("k"), 
                                regexp_extract(col("follower_count"), "([0-9]+)", 1).cast("int") * 1000)
                            .when(col("follower_count").endswith("M"), 
                                regexp_extract(col("follower_count"), "([0-9]+)", 1).cast("int") * 1000000)
                            .otherwise(col("follower_count").cast("int"))
                          )

#Clean column 'save_location'
df_pin = df_pin.withColumn("save_location_correct",
                    regexp_extract("save_location", "/.*", 0))

#Renaming "index" to "ind"
df_pin = df_pin.withColumnRenamed('index', 'ind')

#dropping the old columns "follower_count", "save_location"
df_pin = df_pin.drop('follower_count', 'save_location')

#Correcting the column names and rearranging the columns
df_pin = df_pin.withColumnRenamed('follower_count_correct', 'follower_count')
df_pin = df_pin.withColumnRenamed('save_location_correct', 'save_location')

#Ordering
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")

In [0]:
#MAIN STEP - 6
#-----------------

display(df_pin)

In [0]:
df_pin

In [0]:
#MAIN STEP - 7
#-----------------

#Writing to delta table
df_pin.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0adad64f7925_pin_table")

In [0]:
#df_geo

#MAIN STEP - 1
#-----------------

df_geo = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0adad64f7925-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
#display(df_geo)

In [0]:
#MAIN STEP - 2
#-----------------

df_geo = df_geo.selectExpr("CAST(data as STRING)")

#df_geo_schema
#[country: string, ind: bigint, latitude: double, longitude: double, timestamp: string]

df_geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("country", StringType(), True)
])

df_geo = df_geo.withColumn("data", from_json(df_geo["data"], df_geo_schema)).select(col("data.*"))

#df_geo = df_geo.select(from_json(df_geo.data, df_geo_schema).alias("json")).select("json.*")

In [0]:
#MAIN STEP - 3
#-----------------

#Transformations
##################

#Coordinates column
df_geo = df_geo.withColumn('coordinates', array('latitude', 'longitude'))
df_geo = df_geo.drop('latitude', 'longitude')

#Timestamp
df_geo = df_geo.withColumn('timestamp_correct', to_timestamp('timestamp'))
df_geo = df_geo.drop('timestamp')
df_geo = df_geo.withColumnRenamed('timestamp_correct', 'timestamp')

#Order
df_geo = df_geo.select('ind', 'country', 'coordinates', 'timestamp')


In [0]:
#MAIN STEP - 4
#-----------------

display(df_geo)

In [0]:
df_geo

In [0]:
#MAIN STEP - 5
#-----------------

#Writing to delta table
df_geo.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
        .table("0adad64f7925_geo_table")

In [0]:
#df_user

#MAIN STEP - 1
#-----------------

df_user = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0adad64f7925-user') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

In [0]:
#MAIN STEP - 2
#-----------------

df_user = df_user.selectExpr("CAST(data as STRING)")

#df_user_schema
#df_user -> DataFrame[ind: bigint, first_name: string, last_name: string, age: bigint, date_joined: timestamp]
df_user_schema = StructType([
    StructField("ind", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("age", IntegerType()),
    StructField("date_joined", StringType())
])

df_user = df_user.withColumn("data", from_json(df_user["data"], df_user_schema)).select(col("data.*"))

#df_user = df_user.select(from_json(df_user.data, df_user_schema).alias("json")).select("json.*")

In [0]:
#MAIN STEP - 3
#-----------------

#Transformations
#################

#user_name
df_user = df_user.withColumn('user_name', concat_ws(" ", col('first_name'), col('last_name')))
df_user = df_user.drop('first_name', 'last_name')

#timestamp
df_user = df_user.withColumn('date_joined_correct',
                              to_timestamp('date_joined'))
df_user = df_user.drop('date_joined')
df_user = df_user.withColumnRenamed('date_joined_correct', 'date_joined')

#Ordering
df_user = df_user.select('ind', 'user_name', 'age', 'date_joined')


In [0]:
#MAIN STEP - 4
#-----------------

display(df_user)

In [0]:
df_user

In [0]:
#MAIN STEP - 5
#-----------------

#Writing to delta table
df_user.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
        .table("0adad64f7925_user_table")

In [0]:
#df_pin

#MAIN STEP - 1
#-----------------
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)


df_pin = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0adad64f7925-pin') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()

df_pin = df_pin.selectExpr("CAST(data as STRING)")

df_pin_schema = StructType([StructField("category", StringType()),
                       StructField("description", StringType()),
                       StructField("downloaded", IntegerType()),
                       StructField("follower_count", StringType()),
                       StructField("image_src", StringType()),
                       StructField("index", IntegerType()),
                       StructField("is_image_or_video", StringType()),
                       StructField("poster_name", StringType()),
                       StructField("save_location", StringType()),
                       StructField("tag_list", StringType()),
                       StructField("title", StringType()),
                       StructField("unique_id", StringType())
                      ])
#

df_pin = df_pin.withColumn("data", from_json(df_pin["data"], df_pin_schema)).select(col("data.*"))


df_pin.writeStream \
  .format("delta") \
  .outputMode("append") \
  .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
  .table("0adad64f7925_pin_table")



