In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import explode, col, lit, when
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
from pyspark.sql.functions import monotonically_increasing_id



#### Part 1 - Read Json Data 


In [0]:
base_s3_path = "s3://s3-apify-instagram-raw-dta/instagram/{}-2023-03-01.json"
base_s3_pfl_path = "s3://s3-apify-instagram-raw-dta/instagram/Account Profiles/"

##### 1.1 Post Metrics

In [0]:
competitors_lst = ["waterloosparkling","purelifecanada","lacroixwater", "bublywater", "perriercanada", "monsterenergy", "sanpellegrinoca","drinkspindrift", "montelliercanada"] 
# schweppesus brand has no 2023 posts
# purelifecanada has no videoViewcount
# Brand name association


In [0]:
pos_metrics = [
    "element.id", "element.ownerUsername", "element.url", "element.commentsCount", 
    "element.likesCount", "element.timestamp", "element.ownerId", 
    "element.caption", "element.type", 
    "element.videoViewCount", "element.videoPlayCount"
]


In [0]:
first_path = base_s3_path.format(competitors_lst[0])
master_df = spark.read.json(first_path)
exploded_df = master_df.select(explode(master_df.data).alias("element"))
master_df = exploded_df.select(*pos_metrics)
master_df = master_df.withColumn("Brand_Association", lit(competitors_lst[0]))


In [0]:
# Check and adjust schema if any brands without any video post:
# Define missing fields and their types
missing_fields = {
    "videoViewCount": LongType(),
    "videoPlayCount": LongType(),
}

def add_missing_fields_to_df(df):
    # List the current fields in the 'element' struct
    current_fields = [col("element." + field.name) for field in df.schema["element"].dataType]

    # For each missing field, if it doesn't exist, add a null column of the correct type
    for field, dtype in missing_fields.items():
        if not any([fld.name == field for fld in df.schema["element"].dataType]):
            current_fields.append(lit(None).cast(dtype).alias(field))

    # Rebuild the 'element' struct with the original + new fields
    df = df.withColumn("element", struct(*current_fields))

    return df

for competitor in competitors_lst[1:]:
    s3_path = base_s3_path.format(competitor)
    df = spark.read.json(s3_path)
    exploded_df = df.select(explode(df.data).alias("element"))

    # Add missing fields
    exploded_df = add_missing_fields_to_df(exploded_df)
    
    df = exploded_df.select(*pos_metrics)
    df = df.withColumn("Brand_Association", lit(competitor))
    if master_df is None:
        master_df = df
    else:
        master_df = master_df.union(df)



In [0]:
master_df.display()

id,ownerUsername,url,commentsCount,likesCount,timestamp,ownerId,caption,type,videoViewCount,videoPlayCount,Brand_Association
3192015323732758344,waterloosparkling,https://www.instagram.com/p/CxMUQZgsc9I/,6,151,2023-09-15T00:31:40.000Z,4931723958,"New season—new mocktail. Now pouring: Autumn Sangria featuring our limited time flavors, Spiced Apple and Cranberry. Swipe for recipe. Happy Hour anyone? #WaterlooSparklingWater #WaterDownNothing #mocktails #falldrinks #SparklingWater",Sidecar,,,waterloosparkling
3192449134413029446,waterloosparkling,https://www.instagram.com/p/CxN25LMMSBG/,3,83,2023-09-15T14:53:34.000Z,4931723958,Gameday ritual #7: Always have Waterloo on hand. 🍊 #WaterlooSparklingWater #WaterDownNothing #footballfamily #biggestfan #SparklingWater,Image,,,waterloosparkling
3181641419561905486,waterloosparkling,https://www.instagram.com/p/CwndgOXqC1O/,3,733,2023-08-31T17:03:54.000Z,4931723958,#LaborDay Slay: Pulling up to the cookout with enough Waterloo to share. 😉 #WaterlooSparklingWater #WaterDownNothing #DoYouWaterloo #LaborDay #LaborDayWeekend,Video,35810.0,108972.0,waterloosparkling
3183817990792645927,waterloosparkling,https://www.instagram.com/p/CwvMZgSsY0n/,3,117,2023-09-03T17:05:05.000Z,4931723958,What we mean when we say we’ve got a board meeting… 🏄‍♂️ 📸: @Mayelasjourney #WaterlooSparklingWater #WaterDownNothing #DoYouWaterloo #paddleboarding #summerfun,Image,,,waterloosparkling
3190305706044764328,waterloosparkling,https://www.instagram.com/p/CxGPiL9szCo/,1175,12694,2023-09-12T15:54:58.000Z,4931723958,"⚽ POP-UP CHAIR GIVEAWAY ⚽  Ready for the best seats in the house – wherever you are? Here’s your chance to score a pair of our limited-edition Waterloo pop-up chairs! How to enter:  ⚽ Like this post ⚽ Follow @waterloosparkling ⚽ Tag a fellow superfan in the comments  To participate in this sweepstakes, entrants must be 18+ and reside in the U.S. (excludes Alaska and Hawaii). One lucky winner will be notified via DM from Waterloo’s verified account on 9/19. #WaterlooSparklingWater #WaterDownNothing #giveaway",Image,,,waterloosparkling
3185899526212961679,waterloosparkling,https://www.instagram.com/p/Cw2lr1OAkWP/,120,17682,2023-09-06T14:00:40.000Z,4931723958,🍎BIG FLAVOR COMEBACK! 🍂 The wait is over. Spiced Apple and Cranberry are back – for a limited time only – rolling onto shelves over the coming weeks. See link in bio to find online or at a store near you. Which is your fall go-to? #WaterlooSparklingWater #WaterDownNothing #newflavor #fallflavors #sparklingwater,Image,,,waterloosparkling
3186849191342932080,laflamablanca95,https://www.instagram.com/p/Cw59nRyvIBw/,90,10659,2023-09-07T21:29:37.000Z,683762681,Football’s back and so are @Waterloosparkling’s Spiced Apple and Cranberry flavors. Did a little taste test with the @greenlight crew and one thing is for sure - both flavors will be in the cooler this season. #waterloopartner,Video,1025046.0,1978516.0,waterloosparkling
3187465611932399345,waterloosparkling,https://www.instagram.com/p/Cw8JxYNAPbx/,27,214,2023-09-08T17:52:12.000Z,4931723958,What's your go-to fall flavor for the weekend - Spiced Apple or Cranberry?  #WaterlooSparklingWater #WaterDownNothing #fallflavors #SparklingWater,Sidecar,,,waterloosparkling
3180240030831069908,waterloosparkling,https://www.instagram.com/p/Cwie3VaKcrU/,16,1006,2023-08-29T18:37:16.000Z,4931723958,Defender Casey Krueger is crushing it on the @chicagoredstars pitch — and behind the bar with muddler in hand. See how Casey makes the crowd-pleasing @waterloosparkling CHERRY BERRY SMASH mocktail featuring Waterloo Cherry Limeade Sparkling Water. Link in bio for the recipe! #WaterlooSparklingWater #WaterDownNothing  #DoYouWaterloo #mocktail #soccerlife,Video,126643.0,319478.0,waterloosparkling
3177331187734232248,waterloosparkling,https://www.instagram.com/p/CwYJeGKqzy4/,14,241,2023-08-25T18:38:01.000Z,4931723958,"It's not an obsession, it's a lifestyle. 🍓🍒🍋🏈 #WaterlooSparklingWater #WaterDownNothing #DoYouWaterloo #football #aiArt",Video,31023.0,99466.0,waterloosparkling


In [0]:
master_df = (master_df
            .withColumnRenamed("id", "Post_id")
            .withColumnRenamed("ownerUsername", "Post_username")# join profile on brand account name
            .withColumnRenamed("Brand_Association", "Post_brand") 
            .withColumnRenamed("url", "Post_url")
            .withColumnRenamed("commentsCount", "Post_comments")
            .withColumnRenamed("likesCount", "Post_likes")
            .withColumnRenamed("timestamp", "Post_timestamp")
            .withColumnRenamed("ownerId", "Post_ownerId") # join profile on brand account id
            .withColumnRenamed("caption", "Post_text")
            .withColumnRenamed("type", "Post_type")
            .withColumnRenamed("videoViewCount", "Post_videoView")
            .withColumnRenamed("videoPlayCount", "Post_videoPlay")
            )


In [0]:
master_df.show()

+-------------------+-----------------+--------------------+-------------+----------+--------------------+------------+--------------------+---------+--------------+--------------+-----------------+
|            Post_id|    Post_username|            Post_url|Post_comments|Post_likes|      Post_timestamp|Post_ownerId|           Post_text|Post_type|Post_videoView|Post_videoPlay|       Post_brand|
+-------------------+-----------------+--------------------+-------------+----------+--------------------+------------+--------------------+---------+--------------+--------------+-----------------+
|3192015323732758344|waterloosparkling|https://www.insta...|            6|       151|2023-09-15T00:31:...|  4931723958|New season—new mo...|  Sidecar|          NULL|          NULL|waterloosparkling|
|3192449134413029446|waterloosparkling|https://www.insta...|            3|        83|2023-09-15T14:53:...|  4931723958|Gameday ritual #7...|    Image|          NULL|          NULL|waterloosparkling|
|3181

##### 1.2 Account (Profile) Metrics

In [0]:
pfl_path = base_s3_pfl_path.format('all-brands-profile')
pfl_df = spark.read.json(pfl_path)

pfl_metrics = [
    "element.id", "element.username", "element.fullName", "element.followersCount", "element.followsCount", 
    "element.postsCount", "element.url"
]

exploded_df_pfl = pfl_df.select(explode(pfl_df.data).alias("element"))
pfl_df = exploded_df_pfl.select(*pfl_metrics)

In [0]:
pfl_df = (pfl_df
          .withColumnRenamed("id", "Account_id")
          .withColumnRenamed("fullName", "Brand_name")
          .withColumnRenamed("username", "Account_name")
          .withColumnRenamed("followersCount", "Account_followersCount")
          .withColumnRenamed("followsCount", "Account_followsCount")
          .withColumnRenamed("postsCount", "Account_postsCount")
          .withColumnRenamed("url", "Account_url")
          )


#### Part 2 - Data Handling

##### 2.1 Normalization?

In [0]:
post_attributes_df = master_df.select("Post_id", 
                          "Post_username",
                          "Post_ownerId",
                          "Post_comments",
                          "Post_likes",
                          "Post_timestamp",
                          "Post_url")

In [0]:
post_attributes_df.show(100)

+-------------------+-----------------+------------+-------------+----------+--------------------+--------------------+
|            Post_id|    Post_username|Post_ownerId|Post_comments|Post_likes|      Post_timestamp|            Post_url|
+-------------------+-----------------+------------+-------------+----------+--------------------+--------------------+
|3192015323732758344|waterloosparkling|  4931723958|            6|       151|2023-09-15T00:31:...|https://www.insta...|
|3192449134413029446|waterloosparkling|  4931723958|            3|        83|2023-09-15T14:53:...|https://www.insta...|
|3181641419561905486|waterloosparkling|  4931723958|            3|       733|2023-08-31T17:03:...|https://www.insta...|
|3183817990792645927|waterloosparkling|  4931723958|            3|       117|2023-09-03T17:05:...|https://www.insta...|
|3190305706044764328|waterloosparkling|  4931723958|         1175|     12694|2023-09-12T15:54:...|https://www.insta...|
|3185899526212961679|waterloosparkling| 

In [0]:
post_details_df = master_df.select("Post_id", 
                          "Post_username",
                          "Post_ownerId",
                          "Post_text",
                          "Post_type",
                          "Post_videoView",
                          "Post_videoPlay")

In [0]:
post_details_df.show(100)

+-------------------+-----------------+------------+--------------------+---------+--------------+--------------+
|            Post_id|    Post_username|Post_ownerId|           Post_text|Post_type|Post_videoView|Post_videoPlay|
+-------------------+-----------------+------------+--------------------+---------+--------------+--------------+
|3192015323732758344|waterloosparkling|  4931723958|New season—new mo...|  Sidecar|          NULL|          NULL|
|3192449134413029446|waterloosparkling|  4931723958|Gameday ritual #7...|    Image|          NULL|          NULL|
|3181641419561905486|waterloosparkling|  4931723958|#LaborDay Slay: P...|    Video|         35810|        108972|
|3183817990792645927|waterloosparkling|  4931723958|What we mean when...|    Image|          NULL|          NULL|
|3190305706044764328|waterloosparkling|  4931723958|⚽ POP-UP CHAIR GI...|    Image|          NULL|          NULL|
|3185899526212961679|waterloosparkling|  4931723958|🍎BIG FLAVOR COME...|    Image|      

In [0]:
brand_df = (pfl_df.select("Brand_name")
                   .withColumn("Brand_id", monotonically_increasing_id()))

In [0]:
brand_df.show(100)

+--------------------+--------+
|          Brand_name|Brand_id|
+--------------------+--------+
|LaCroix Sparkling...|       0|
|bubly sparkling w...|       1|
|      Perrier Canada|       2|
|      Monster Energy|       3|
|        S.Pellegrino|       4|
|Waterloo Sparklin...|       5|
|Spindrift Sparkli...|       6|
|           Schweppes|       7|
|    Pure Life Canada|       8|
|          Montellier|       9|
+--------------------+--------+



In [0]:
account_df = pfl_df.select("Account_id", 
                          "Account_name",
                          "Account_followersCount",
                          "Account_followsCount",
                          "Account_postsCount",
                          "Account_url")

In [0]:
account_df.show() 

+-----------+-----------------+----------------------+--------------------+------------------+--------------------+
| Account_id|     Account_name|Account_followersCount|Account_followsCount|Account_postsCount|         Account_url|
+-----------+-----------------+----------------------+--------------------+------------------+--------------------+
|  597611708|     lacroixwater|                204222|                 436|              5689|https://www.insta...|
| 6268579358|       bublywater|                 52951|                 121|               717|https://www.insta...|
|   18360162|    perriercanada|                  6358|                 645|               989|https://www.insta...|
|   14653744|    monsterenergy|               8559977|                1332|              8120|https://www.insta...|
|42560281358|  sanpellegrinoca|                  1611|                  64|                95|https://www.insta...|
| 4931723958|waterloosparkling|                 56380|                 5

In [0]:
Partnership_account = master_df.select( "Post_username",
                          "Post_ownerId",
                          "Post_brand").distinct() # post_brand is brand'username, not brand name

In [0]:
Partnership_account.show(100) 

+------------------+------------+-----------------+
|     Post_username|Post_ownerId|       Post_brand|
+------------------+------------+-----------------+
|    purelifecanada|  7475812957|   purelifecanada|
|        bublywater|  6268579358|       bublywater|
|            xgames|     9190921|    monsterenergy|
|             gopro|    28902942|    monsterenergy|
|       monsterarmy|   222624161|    monsterenergy|
|   strickland_mma_| 54427630273|    monsterenergy|
|     monstergaming|    29891576|    monsterenergy|
|     monsterenergy|    14653744|    monsterenergy|
|    class1official| 38043412513|    monsterenergy|
|      monstermusic|    43545679|    monsterenergy|
|      hailiedeegan|   289393301|    monsterenergy|
|     briandeegan38|    20416882|    monsterenergy|
|   bulletvalentina|  1579112483|    monsterenergy|
|              mxgp|   201585872|    monsterenergy|
|monsterenergygirls|  1528418435|    monsterenergy|
|          brendog1|     8920714|    monsterenergy|
|           

In [0]:
master_df.write.parquet("s3://s3-apify-instagram-raw-dta/instagram/master_df.parquet")