# Kinesis Streaming
1. Get the AWS authentication key file
2. Read the pin Kinesis stream
3. Clean the pin data
4. Save pin to a delta table
5. Read the geo Kinesis stream
6. Clean the geo data
7. Save geo to a delta table
8. Read the user Kinesis stream
9. Clean the user data
10. Save user to a delta table

## Get authentication File

In [0]:
%run "./AWS Access Utils"

## Read the pin Kinesis stream

In [0]:
# Define the structure of the array in the "data" column
schema = (
    "index long, unique_id string, title string, description string, poster_name string, " +
    "follower_count string, tag_list string, is_image_or_video string, " +
    "image_src string, downloaded long, save_location string, category string"
)
# Read in the Kinesis stream to a dataframe
df_pin = read_stream('streaming-129a67850695-pin', schema)
df_pin.limit(5).display()

# Store as a Global Temporary View for use by the cleaning notebook
df_pin.createOrReplaceGlobalTempView("gtv_129a67850695_stream_pin")

## Clean the pin data

In [0]:
%run "./Clean Pin Data" $mode="Stream"

## Save pin to a delta table

In [0]:
# Get the Global Temporary View from the cleaning notebook
df_pin_clean = spark.table("global_temp.gtv_129a67850695_stream_pin_clean")
df_pin_clean.limit(5).display()

# Write the cleaned dataframe to a Delta Table 
write_stream('129a67850695_pin_table', df_pin_clean)

## Read the geo Kinesis stream

In [0]:
# Define the structure of the array in the "data" column
schema = ("ind long, timestamp string, latitude double, longitude double, country string")

# Read in the Kinesis stream to a dataframe
df_geo = read_stream('streaming-129a67850695-geo', schema)
df_geo.limit(5).display()

# Store as a Global Temporary View for use by the cleaning notebook
df_geo.createOrReplaceGlobalTempView("gtv_129a67850695_stream_geo")

## Clean the geo data

In [0]:
%run "./Clean Geo Data" $mode="Stream"

## Save geo to a delta table

In [0]:
# Get the Global Temporary View from the cleaning notebook
df_geo_clean = spark.table("global_temp.gtv_129a67850695_stream_geo_clean")
df_geo_clean.limit(5).display()

# Write the cleaned dataframe to a Delta Table 
write_stream('129a67850695_geo_table', df_geo_clean)

## Read the user Kinesis stream

In [0]:
# Define the structure of the array in the "data" column
schema = ("ind long, first_name string, last_name string, age long, date_joined string")

# Read in the Kinesis stream to a dataframe
df_user = read_stream('streaming-129a67850695-user', schema)
df_user.limit(5).display()

# Store as a Global Temporary View for use by the cleaning notebook
df_user.createOrReplaceGlobalTempView("gtv_129a67850695_stream_user")

## Clean the user data

In [0]:
%run "./Clean User Data" $mode="Stream"

## Save user to a delta table

In [0]:
# Get the Global Temporary View from the cleaning notebook
df_user_clean = spark.table("global_temp.gtv_129a67850695_stream_user_clean")
df_user_clean.limit(5).display()

# Write the cleaned dataframe to a Delta Table 
write_stream('129a67850695_user_table', df_user_clean)