#Imports


In [0]:
from pyspark.sql.functions import col, count, when, regexp_extract, regexp_replace, concat_ws, array
from pyspark.sql.types import TimestampType


#Load dataframes

In [0]:
##load pin data
file_location = "/mnt/pinterest_s3/topics/12256357c821.pin/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_pin = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)

In [0]:
## load geo
file_location = "/mnt/pinterest_s3/topics/12256357c821.geo/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_geo = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)

In [0]:
## load user data
file_location = "/mnt/pinterest_s3/topics/12256357c821.user/partition=0/*.json" 
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# Read in JSONs from mounted S3 bucket
df_user = spark.read.format(file_type) \
.option("inferSchema", infer_schema) \
.load(file_location)

# Pin Data


## Task 1  Replace empty entries and entries with no relevant data in each column with Nones

### approach--> inspect each column and understand the data, firstly by figuring the unique values and seeing if everything is inline
1. category column has no missing values --> df_pin.select('category').distinct().show() 
2. description since there are more unique values I decided to take a difference approach here --> df_pin.groupBy('description').agg(count('*').alias('count')).orderBy('count', ascending=False).show()--> "No description av"
3. downloaded seems to have few distinct values and therefore df_pin.select('downloaded').distinct().show().orderBy('count', ascending=False).show() is useful here 0,1 where the only unique values
4. follower count --> df_pin.select(regexp_extract(col("follower_count"), "([a-zA-Z,.]+)", 1).alias("letters")).distinct().show() this will essentially help us with future tasks as well as checking what values are in the column
5. image_src --> pattern = r"^https://i\.pinimg\.com"  Filter rows where "image_src" doesn't match the pattern non_matching_entries = df_pin.filter(~col("image_src").rlike(pattern)) non_matching_entries.select("image_src").distinct().show()--> one entry is wrong so will replace it with none 
6. poster_name df_pin.groupBy('poster_name').agg(count('*').alias('count')).show().orderBy('count', ascending=False).show()
7. save_location similar pattern filteration pattern = r"^Local save in /data/\w+$" gave 4 values but the difference is that post data/ we had a "-" between words which don't represent an unusual value
8. tag_list -->  df_pin.groupBy('tag_list').agg(count('*').alias('count')).show().orderBy('count', ascending=False).show()--> 'N,o, ,T,a,g,s, ,A'
9. title --> df_pin.groupBy('title').agg(count('*').alias('count')).show().orderBy('count', ascending=False).show() --> No Title Data Ava
10. unique id check for pattern uuid_pattern = r"^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$" none returned
therefore, there are now empty entries or entries with non-relevant data to give the value none

In [0]:
# Replace "User Info Error" with None in the "poster_name" column
df_pin = df_pin.withColumn(
    "poster_name",
    when(col("poster_name").rlike("(?i)User Info Error"), None).otherwise(col("poster_name"))
)

# Replace "No Title Data Avai" with None in the "title" column
df_pin = df_pin.withColumn(
    "title",
    when(col("title").rlike("(?i)No Title Data"), None).otherwise(col("title"))
)

# Replace "User" with None in the "follower_count" column
df_pin = df_pin.withColumn(
    "follower_count",
    when(col("follower_count").rlike("(?i)User"), None).otherwise(col("follower_count"))
)

# Replace "No description available" with None in the "description" column
df_pin = df_pin.withColumn(
    "description",
    when(col("description").rlike("(?i)No description av"), None).otherwise(col("description"))
)

# Replace entries in the "tag_list" column that contain "N,o, ,T,a,g,s, ,A" with None
df_pin = df_pin.withColumn(
    "tag_list",
    when(col('tag_list').rlike("(?i)N,o, ,T,a,g,s, ,A"), None).otherwise(col("tag_list"))
)


## Task 2 
- Perform the necessary transformations on the follower_count to ensure every entry is a number. Make sure the data type of this column is an int.
- approach
  1. data has already been cleaned to only include numbers or numbers followed by "k" or "M"
  2. strip K and M and mulitply by 1000 and 1,000,000 respectively
  3. transform all strings into integers 

In [0]:
df_pin = df_pin.withColumn(
    "follower_count", 
    when(df_pin.follower_count.endswith("k"), 
         regexp_replace(df_pin.follower_count, "k", "").cast("int") * 1000)
    .otherwise(
        when(df_pin.follower_count.endswith("M"), 
             regexp_replace(df_pin.follower_count, "M", "").cast("int") * 1000000)
        .otherwise(df_pin.follower_count.cast("int"))
    )
)

## Task 3
- Clean the data in the save_location column to include only the save location path
- Approach 
  1. remove "Local save in " from save_location to just show pathname 


In [0]:
df_pin=df_pin.withColumn(
    "save_location",
    regexp_replace(df_pin.save_location,"Local save in","")
)

## Final Cleaning
- Rename the index column to ind.
- Reorder the DataFrame columns to have the following column order:
  1. ind
  2. unique_id
  3. title
  4. description
  5. follower_count
  6. poster_name
  7. tag_list 
  8. is_image_or_video
  9. image_src
  10. save_location
  11. category

In [0]:
## change name of index to ind
df_pin = df_pin.withColumnRenamed("index", "ind")


In [0]:
## reorder df columns
df_pin = df_pin.select(
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
)


In [0]:
# df_pin.write.saveAsTable("pin_data")

# Geo Data 

## Task 1 
- Create a new column coordinates that contains an array based on the latitude and longitude columns and drop latitude and logitude

In [0]:
df_geo=df_geo.withColumn("coordinates",array("latitude","longitude"))

In [0]:
df_geo=df_geo.drop("latitude")
df_geo=df_geo.drop("longitude")

## Task 2 
- Convert the timestamp column from a string to a timestamp data type

In [0]:
df_geo = df_geo.withColumn("timestamp", col("timestamp").cast(TimestampType()))

## Final Cleaning
- reorder the dataframe columns to have the following column order: 
  1. ind 
  2. country
  3. coordinates
  4. timestamp

In [0]:
df_geo= df_geo.select(
    "ind",
    "country",
    "coordinates",
    "timestamp"
)

In [0]:
# df_geo.write.saveAsTable("geo_data")

# User Data

## Task 1 
- Create a new column user_name that concatenates the information found in the first_name and last_name columns and drop first_name and last_name

In [0]:
df_user = df_user.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))
df_user=df_user.drop("first_name")
df_user=df_user.drop("last_name")

## Task 2
- Convert the date_joined column from a string to a timestamp data type

In [0]:
df_user=df_user.withColumn("date_joined",col("date_joined").cast(TimestampType()))

## Final Cleaning
- Reorder the DataFrame columns to have the following column order
  1. ind 
  2. user_name
  3. age
  4. date_joined

In [0]:
df_user=df_user.select(
    "ind",
    "user_name",
    "age",
    "date_joined"
)


In [0]:
# df_user.write.mode('overwrite').saveAsTable("user_data")