In [0]:
# imports functions like median, max, etc
from pyspark.sql.functions import *
import urllib

In [0]:
# File path to table with S3 bucket credentails
s3_creds_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Creates dataframe from the above table
creds_df = spark.read.format("delta").load(s3_creds_path)

# Stores the values from the table above
ACCESS_KEY = creds_df.select("ACCESS key ID").collect()[0]["ACCESS key ID"]
SECRET_KEY = creds_df.select("Secret access key").collect()[0]["Secret access key"]

ENCODED_SECRET_KEY = urllib.parse.quote(string = SECRET_KEY, safe = "")

In [0]:
# Name of S3 bucket
AWS_S3_BUCKET = "user-0affe94cc7d3-bucket"

# Name of the mount from databricks to S3
MOUNT_NAME = "/mnt/pinterest_s3_mount"

SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

# This does the mounting
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

True

In [0]:
%sql
SET spark.databricks.delta.formatCheck.enabled = false

key,value
spark.databricks.delta.formatCheck.enabled,False


In [0]:
# Specifies the type and location of the files in the S3 bucket
file_type = "json"
pin_file_loc = "s3://user-0affe94cc7d3-bucket/topics/0affe94cc7d3.pin/partition=0/*.json"
geo_file_loc = "s3://user-0affe94cc7d3-bucket/topics/0affe94cc7d3.geo/partition=0/*.json"
user_file_loc = "s3://user-0affe94cc7d3-bucket/topics/0affe94cc7d3.user/partition=0/*.json"

infer_schema = "true"

# Creates dataframes for each of the topic folders
# .distinct because data generation was random sampling creating lots of duplicates
df_pin = spark.read.format(file_type)\
  .option("inferSchema", infer_schema)\
  .load(pin_file_loc).distinct()

df_geo = spark.read.format(file_type)\
  .option("inferSchema", infer_schema)\
  .load(geo_file_loc).distinct()

df_user = spark.read.format(file_type)\
  .option("inferSchema", infer_schema)\
  .load(user_file_loc).distinct()

In [0]:
# List of the common no relevant data strings in the pin dataframe
value_to_replace = ["No description available Story format",
                    "User Info Error", "Image scr error.",
                    "User Info Error", "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
                    "No Title Data Available"]

# The columns where the above strings appear
replace_in_column = ["description", "follower_count",
                     "image_src", "poster_name",
                     "tag_list", "title"]

# Replaces the no relevant data strings with the None type
df_pin = df_pin.replace(value_to_replace, None, subset = replace_in_column)

In [0]:
# Searches for (k, M) and replaces with (000, 000000) in the follower_count column
df_pin = df_pin.withColumn("follower_count",
                           regexp_replace("follower_count", "M", "000000"))
df_pin = df_pin.withColumn("follower_count",
                           regexp_replace("follower_count", "k", "000"))

# Removes the string "Local save in " from the save_location column
df_pin = df_pin.withColumn("save_location",
                           regexp_replace("save_location", "Local save in ", ""))

In [0]:
# Changes the data types of three columns 
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast("int"))\
  .withColumn("index", col("index").cast("int"))\
  .withColumn("downloaded", col("downloaded").cast("int"))

In [0]:
# Renames the "index" column to "ind"
df_pin = df_pin.withColumnRenamed("index", "ind")

In [0]:
# Reorders the columns of the pin dataframe
df_pin = df_pin.select(df_pin.ind, df_pin.unique_id,
                       df_pin.title, df_pin.description,
                       df_pin.follower_count, df_pin.poster_name,
                       df_pin.tag_list, df_pin.is_image_or_video,
                       df_pin.image_src, df_pin.save_location, df_pin.category)

In [0]:
# Combines the latitube and longitude columns into column of arrays
df_geo = df_geo.withColumn("coordinates", array("latitude", "longitude"))

# Removes the now redundant latitude and longitude columns
df_geo = df_geo.drop("latitude", "longitude")

In [0]:
# Changes the data type of the timestamp column from str to timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))

In [0]:
# Reorders the columns of the geo dataframe
df_geo = df_geo.select(df_geo.ind, df_geo.country,
                       df_geo.coordinates, df_geo.timestamp)

In [0]:
# Combines the first_name and last_name columns into a single column
df_user = df_user.withColumn("user_name", concat("first_name", lit(" "), "last_name"))

# Removes the first_name and last_name columns
df_user = df_user.drop("first_name", "last_name")

In [0]:
# Changes the data type of the date_joined column from str to timestamp
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))

In [0]:
# Reorders the columns of the user dataframe
df_user = df_user.select(df_user.ind, df_user.user_name,
                         df_user.age, df_user.date_joined)

In [0]:
# Joins the pin and geo dataframes
df_pin_geo = df_pin.join(df_geo, df_geo["ind"] == df_pin["ind"], how = "left")

# Groups data by country then preforms the aggregations mode and count
pop_cat_by_loc = df_pin_geo.groupBy("country")\
  .agg(mode("category").alias("most_popular_category"),
                          count("category").alias("total_number_of_categories"))
  
display(pop_cat_by_loc)

country,most_popular_category,total_number_of_categories
Afghanistan,education,14
Albania,art,12
Algeria,quotes,14
American Samoa,tattoos,13
Andorra,tattoos,10
Angola,diy-and-crafts,3
Anguilla,diy-and-crafts,4
Antarctica (the territory South of 60 deg S),tattoos,5
Antigua and Barbuda,art,8
Argentina,tattoos,9


In [0]:
# Selects the relevant columns year from timestamp and category
# Groups data by year then aggregates with mode and count
pop_cat_by_year = df_pin_geo.select(year("timestamp")\
  .alias("post_year"),"category")\
  .groupBy("post_year").agg(mode("category").alias("mode category"),
                            count("category").alias("number_of_categories"))
  
pop_cat_by_year.show(15)

+---------+--------------+--------------------+
|post_year| mode category|number_of_categories|
+---------+--------------+--------------------+
|     2018|           art|                  79|
|     2022|     christmas|                  63|
|     2019|diy-and-crafts|                  75|
|     2020|       finance|                  81|
|     2017|        quotes|                  15|
|     2021|        quotes|                  67|
+---------+--------------+--------------------+



In [0]:
# Selects the relevant columns country, poster_name and follower_count
# Groups data by country then aggregates with max_by and max
pop_user_by_loc = df_pin_geo.select("country", "poster_name","follower_count")\
  .groupBy("country").agg(max_by("poster_name", "follower_count").alias("poster_name"),
       max("follower_count").alias("follower_count"))
  
pop_user_by_loc.show(15)

+--------------------+--------------------+--------------+
|             country|         poster_name|follower_count|
+--------------------+--------------------+--------------+
|         Afghanistan|                9GAG|       3000000|
|             Albania|   The Minds Journal|       5000000|
|             Algeria|           YourTango|        942000|
|      American Samoa|         Mamas Uncut|       8000000|
|             Andorra|Teachers Pay Teac...|       1000000|
|              Angola|           Tastemade|       8000000|
|            Anguilla|Kristen | Lifesty...|         92000|
|Antarctica (the t...|          Refinery29|       1000000|
| Antigua and Barbuda|Country Living Ma...|       1000000|
|           Argentina|         Next Luxury|        800000|
|             Armenia|Michelle {CraftyM...|        892000|
|               Aruba|         GQ Magazine|        874000|
|           Australia|   Cultura Colectiva|       1000000|
|             Austria|The World Pursuit...|         8900

In [0]:
# Selects the relevant columns country and follower_count
# Aggregates with max_by and max
loc_with_user_with_max_follower = pop_user_by_loc.select("country","follower_count")\
  .agg(max_by("country", "follower_count"), max("follower_count").alias("follower_count"))
  
display(loc_with_user_with_max_follower)

"max_by(country, follower_count)",follower_count
American Samoa,8000000


In [0]:
# Creates a lambda function for determining age group from age
age_grp = udf(lambda age: "18-24" if 18 <= age <= 24 else
              "25-35" if 25 <= age <= 35 else
              "36-50" if 36 <= age <= 50 else
              "+50" if 51 <= age else "")

# Joins the pin and user dataframes
df_pin_user = df_pin.join(df_user, df_pin["ind"] == df_user["ind"], how = "left")

# Selects the relevant columns age and category
# Creates a new column using the age group lambda function
# Removes the age column
# Groups data by age group then aggregates with mode and count
pop_cat_by_age_grp = df_pin_user.select("age", "category")\
  .withColumn("age_group", age_grp(df_pin_user["age"]))\
  .drop("age")\
  .groupBy("age_group").agg(mode("category").alias("category"),
                            count("category").alias("category_count"))
  
display(pop_cat_by_age_grp)

age_group,category,category_count
36-50,quotes,77
+50,mens-fashion,33
18-24,tattoos,166
25-35,travel,104


In [0]:
# Selects the relevant columns age and follower_count
# Creates a new column using the age group lambda function
# Removes the age column
# Groups data by age group then aggregates with median
# Orders by the median (ascending)
median_follower_by_age_grp = df_pin_user.select("age", "follower_count")\
  .withColumn("age_group", age_grp(df_pin_user["age"]))\
  .drop("age")\
  .groupBy("age_group").agg(median("follower_count").alias("median_follower_count"))\
  .orderBy("median_follower_count")
  
display(median_follower_by_age_grp)

age_group,median_follower_count
+50,3000.0
36-50,6000.0
25-35,27000.0
18-24,106000.0


In [0]:
# Selects the relevant columns year from date_joined and ind
# Groups data by year then aggregates with count
users_joined_by_year = df_user.select(year("date_joined").alias("year"),"ind")\
  .groupBy("year").agg(count("ind").alias("number_user_joined"))
  
display(users_joined_by_year)

year,number_user_joined
2015,137
2016,182
2017,61


In [0]:
# Joins the geo and pin dataframes
df_geo_pin = df_geo.join(df_pin, df_geo["ind"] == df_pin["ind"], how = "left")

# Selects the relevant columns year from timestamp and follower_count
# Groups data by year then aggregates with median
# Orders results by year (ascending)
median_follower_by_year = df_geo_pin.select(year("timestamp").alias("post_year"), "follower_count")\
  .groupBy("post_year").agg(median("follower_count").alias("median_follower_count"))\
  .orderBy("post_year")
  
display(median_follower_by_year)

post_year,median_follower_count
2017,72000.0
2018,23500.0
2019,25000.0
2020,26000.0
2021,19000.0
2022,37000.0


In [0]:
# Joins the user and geo dataframes into a dataframe X
# Joins the X and pin dataframes
# Creates a new column using the age group lambda function
# Removes the age column
# Selects the relevant columns age_group, year from timestamp and follower_count
# Groups data by year then by age group then aggregates with median
# Orders results by year
median_follower_by_year_by_age_grp = df_user.join(df_geo, df_geo["ind"] == df_user["ind"],
                                                  how = "left")\
  .join(df_pin, df_pin["ind"] == df_user["ind"], how = "left")\
  .withColumn("age_group", age_grp("age"))\
  .drop("age")\
  .select("age_group", year("timestamp").alias("post_year"), "follower_count")\
  .groupBy("post_year", "age_group").agg(median("follower_count").alias("median_follower_count"))\
  .orderBy("post_year")

display(median_follower_by_year_by_age_grp)

post_year,age_group,median_follower_count
2017,+50,170000.0
2017,36-50,72000.0
2017,25-35,36500.0
2017,18-24,706500.0
2018,+50,5000.0
2018,36-50,1854.5
2018,18-24,75000.0
2018,25-35,22000.0
2019,36-50,6000.0
2019,18-24,51000.0


In [0]:
# Disconnects from the S3 bucket
dbutils.fs.unmount(MOUNT_NAME)

/mnt/pinterest_s3_mount has been unmounted.


True