In [0]:
%run "/Workspace/Users/abigailgcox@outlook.com/pinterest_data_cleaning"

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib


# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
%sql
-- Disable format checks during the reading of Delta tables
SET spark.databricks.delta.formatCheck.enabled=false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
# Define a streaming schema using StructType
pin_schema = StructType([
    StructField("index", IntegerType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("downloaded", IntegerType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

geo_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("timestamp", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True)
])

user_schema = StructType([
    StructField("ind", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)
])

In [0]:
def read_streaming_data(kinesis_stream_name):
    spark_df = spark \
    .readStream \
    .format('kinesis') \
    .option('streamName', kinesis_stream_name) \
    .option('initialPosition','earliest') \
    .option('region','us-east-1') \
    .option('awsAccessKey', ACCESS_KEY) \
    .option('awsSecretKey', SECRET_KEY) \
    .load()

    return spark_df

In [0]:
def structure_data(df, schema):
    df = df.selectExpr("CAST(data as STRING)")
    df_2 = df.withColumn("data", from_json("data", schema)).select(col('data.*'))
    return df_2

In [0]:
def write_to_delta_table(df, delta_table_name):
  df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table(delta_table_name)

In [0]:
def run_pipeline(kinesis_stream_name=None, schema=None, delta_table_name=None):
    raw_spark_df = read_streaming_data(kinesis_stream_name)
    structured_df = structure_data(raw_spark_df, schema)
    if schema == pin_schema:
        cleaned_df = clean_pin_data(structured_df)
    elif schema == geo_schema:
        cleaned_df = clean_geo_data(structured_df)
    elif schema == user_schema:
        cleaned_df = clean_user_data(structured_df)
    else:
        print("Error: check that your schema name is correct.")
    write_to_delta_table(cleaned_df, delta_table_name)
    display(cleaned_df)


run_pipeline(kinesis_stream_name='streaming-0a55250cde99-geo', schema=geo_schema, delta_table_name='0a55250cde99_geo_table')
run_pipeline(kinesis_stream_name='streaming-0a55250cde99-user', schema=user_schema, delta_table_name='0a55250cde99_user_table')
run_pipeline(kinesis_stream_name='streaming-0a55250cde99-pin', schema=pin_schema, delta_table_name='0a55250cde99_pin_table')


ind,user_name,age,date_joined
3185,Alexandra Fuentes,29,2016-01-03T02:33:53.000+0000
9812,Janet Andrews,28,2016-06-11T14:48:17.000+0000
5360,Anna Cross,38,2015-11-21T18:19:10.000+0000
9536,Bruce Lyons,20,2016-07-09T23:03:37.000+0000
9966,Abigail Ali,20,2015-10-24T11:23:51.000+0000
10114,Benjamin Brady,21,2016-05-11T06:22:07.000+0000
10909,Margaret Hancock,33,2016-08-04T13:07:57.000+0000
4662,Dominique Ford,42,2016-01-04T17:04:43.000+0000
7857,Benjamin Adams,24,2015-10-25T03:23:53.000+0000
7296,Christopher Mckee,38,2016-11-22T23:41:41.000+0000


In [0]:
# run this line each time before re-running a stream
# dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)