In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

aws_keys_df = spark.read.format("delta").load(delta_table_path)

In [0]:
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
stream_df = spark \
.readStream \
.format("kinesis") \
.option("streamName","Kinesis-Prod-Stream") \
.option("initialPosition","earliest") \
.option("region","us-east-1") \
.option("awsAccessKey", ACCESS_KEY) \
.option("awsSecretKey", SECRET_KEY) \
.load()

In [0]:
pin_schema = StructType([
    StructField("index", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

geo_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("country", StringType(), True)
])

user_schema =  StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", StringType(), True),
    StructField("date_joined", StringType(), True)
])

In [0]:
pin_df = stream_df.filter(stream_df.partitionKey == "pin").selectExpr("CAST(data as STRING) jsonData")
pin_df = pin_df.select(from_json("jsonData", pin_schema).alias("data")).select("data.*")

geo_df = stream_df.filter(stream_df.partitionKey == "geo").selectExpr("CAST(data as STRING) jsonData")
geo_df = geo_df.select(from_json("jsonData", geo_schema).alias("data")).select("data.*")

user_df = stream_df.filter(stream_df.partitionKey == "user").selectExpr("CAST(data as STRING) jsonData")
user_df = user_df.select(from_json("jsonData", user_schema).alias("data")).select("data.*")


In [0]:
%run "./s3_cleaning_functions"

In [0]:
cleaned_pin_df = clean_pin_data(pin_df)
cleaned_geo_df = clean_geo_data(geo_df)
cleaned_user_df = clean_user_data(user_df)

In [0]:
%sql
USE 0e59bc5e89eb_db

In [0]:
cleaned_pin_df.writeStream\
    .format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/pin") \
    .table("streamed_pin_data")

cleaned_geo_df.writeStream\
    .format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/geo") \
    .table("streamed_geo_data")

cleaned_user_df.writeStream\
    .format("delta")\
    .outputMode("append")\
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/user") \
    .table("streamed_user_data")

<pyspark.sql.streaming.query.StreamingQuery at 0x7f42141f8a90>