In [11]:
import os, pandas as pd
import numpy as np
import datetime as dt
import time

from pyspark.context import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Dataset

The dataset can be downloaded from: https://www.kaggle.com/datasets/shmalex/instagram-dataset?select=instagram_locations.csv

It contains information on 42M Posts, 1.2M Locations and 4.5M Profiles from Instagram from 2010 to 2019.

# Firestore

In [28]:
%env GOOGLE_APPLICATION_CREDENTIALS=/Users/andreasrousos/Desktop/Fall 2022/Big Data/final project/final-project-372202-aaf7027fa45e.json

env: GOOGLE_APPLICATION_CREDENTIALS=/Users/andreasrousos/Desktop/Fall 2022/Big Data/final project/final-project-372202-aaf7027fa45e.json


Data was uploaded to Google Cloud's Firestore using the Firefoo GUI, used for easier browsing, querying and editing Firestore data. Trial version of Firefoo was available for 2 weeks.

more info on:  
https://firefoo.app/

Users were added later to Firestore, using batches as advised by the docs.

In [29]:
from google.cloud import firestore

db = firestore.Client()

In [30]:
batch_size=499
for i in range(0,len(user_list), batch_size):
    list_batch=user_list[i:i+batch_size]
    batch = db.batch()
    for item in list_batch:
        doc_ref = db.collection(u'users').document(str(item['sid']))
        doc_ref.set(item)
    batch.commit()

In [83]:
posts_ref=db.collection('instagram_posts')
user_ref=db.collection('users')
locs_ref=db.collection('locations')
demo_ref=db.collection('posts_demo')

## Queries

Lets query some data to check if everything is working fine, and explore part of the data.

First, lets find the 10 most liked posts in the dataset that also have location info provided:

In [84]:
#This query required a composite index that was created on Firebase
#order_by location_id was used to filter out documents that dont have location info.

top10_posts=posts_ref.order_by('location_id',direction=firestore.Query.DESCENDING)\
                    .order_by('numbr_likes',direction=firestore.Query.DESCENDING)\
                    .limit(10).get()

import pprint

top10_list=[doc.to_dict() for doc in top10_posts]
for doc in top10_list:
    pprint.pprint(doc)
    print('\n')

Let's also check if we have users with more than 5M followers.

In [161]:
most_followers=user_ref.where('followers','>',5000000)\
                        .order_by('followers',direction=firestore.Query.DESCENDING)\
                        .get()

for user in most_followers:
    pprint.pprint(user.to_dict())
    print('\n')

{'cts': '2019-08-03 09:49:20.138 -0400',
 'description': 'Actress @ayamjerit @vavacake_premiumcake @kamaliabeauty '
                '@villakamalia @villajunabali  @titidantian @bumbubytitikamal '
                '\\nYoutube channel: Titi Dan Tian',
 'firstname_lastname': 'Titi Kamal',
 'followers': 9066869.0,
 'following': 895.0,
 'is_business_account': True,
 'n_posts': 3840.0,
 'profile_id': 29837504.0,
 'profile_name': 'titi_kamall',
 'sid': 4218652,
 'url': 'youtu.be/Bl1NS5BWDFw'}


{'cts': '2019-08-03 06:45:44.197 -0400',
 'description': '@Chermarn arn.collection @misswintersnow #chermarnishappy',
 'firstname_lastname': 'Chermarn Boonyasak',
 'followers': 7510511.0,
 'following': 1160.0,
 'is_business_account': False,
 'n_posts': 8224.0,
 'profile_id': 3896328.0,
 'profile_name': 'chermarn',
 'sid': 4216424,
 'url': nan}


{'cts': '2019-08-02 18:38:47.794 -0400',
 'description': 'Periodista, conductor y productor. Vice Presidente de San '
                'Lorenzo de Almagro',
 'fi

Since the query operators are limited in NoSQL DBs there is only so much that we can query. More queries can be easily applied through the Google Cloud Platform's Firestore Query builder. 

# Convert to JSON

I converted everything to a JSON format, which has a closer structure to the Documents in Firestore. This was done for data processing purposes, as querying the database each time to process data for the project would be costly (Firestore's charges for reads). So this is equivalent to exporting the data from Firestore once (read 1 time only), and use them for data processing or uploading it to a cluster.

In [2]:
conf = SparkConf().setMaster('local[*]') 
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)

23/01/18 07:49:24 WARN Utils: Your hostname, Andreass-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.0.10 instead (on interface en0)
23/01/18 07:49:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/18 07:49:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/18 07:49:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Schemata

The below schemata were used to enforce some datatypes and some structure to the data.

In [3]:
user_schema=StructType([
    StructField('sid', LongType(), True),
    StructField('profile_id', LongType(), True),
    StructField('profile_name', StringType(), True),
    StructField('firstname_lastname', StringType(), True),
    StructField('description', StringType(), True),
    StructField('following', IntegerType(), True),
    StructField('followers', IntegerType(), True),
    StructField('n_posts', IntegerType(), True),
    StructField('url', StringType(), True),
    StructField('cts', TimestampType(), True),
    StructField('is_business_account', StringType(), True)
])

In [4]:
loc_schema=StructType([
    StructField('sid', LongType(), True),
    StructField('id', LongType(), True),
    StructField('name', StringType(), True),
    StructField('street', StringType(), True),
    StructField('zip', StringType(), True),
    StructField('city', StringType(), True),
    StructField('region', StringType(), True),
    StructField('cd', StringType(), True),
    StructField('phone', StringType(), True),
    StructField('aj_exact_city_match', StringType(), True),
    StructField('aj_exact_country_match', StringType(), True),
    StructField('blurb', StringType(), True),
    StructField('dir_city_id', StringType(), True),
    StructField('dir_city_name', StringType(), True),
    StructField('dir_city_slug', StringType(), True),
    StructField('dir_country_id', StringType(), True),
    StructField('dir_country_name', StringType(), True),
    StructField('lat', DoubleType(), True), 
    StructField('lng', DoubleType(), True),
    StructField('primary_alias_on_fb', StringType(), True),
    StructField('slug', StringType(), True),
    StructField('website', StringType(), True),
    StructField('cts', TimestampType(), True)
])

In [5]:
post_schema = StructType([
    StructField('sid', IntegerType(), False),
    StructField('sid_profile', IntegerType(), False),
    StructField('post_id', StringType(), False),
    StructField('profile_id', IntegerType(), False),
    StructField('location_id', IntegerType(), True),
    StructField('cts', TimestampType(), False),
    StructField('post_type', IntegerType(), True),
    StructField('description', StringType(), True),
    StructField('numbr_likes', IntegerType(), True),
    StructField('number_comments', IntegerType(), True)
])

## Collections

In [6]:
users_df = spark.read.load("archive/instagram_profiles.csv", \
    format="csv", 
    sep="\t",
    schema=user_schema,
    header="true")

users_df.coalesce(1).write.json('data/users.json')

In [7]:
locations_df = spark.read.load("archive/instagram_locations.csv", \
    format="csv", 
    sep="\t",
    schema=loc_schema,
    header="true")

locations_df.coalesce(1).write.json('data/locations.json')

In [8]:
posts_df = spark.read.load("archive/instagram_posts.csv", \
    format="csv", 
    sep="\t",
    schema=post_schema,
    header="true")

posts_df.coalesce(1).write.json('data/posts.json')

## Data for Streaming Demo

For the streaming data demonstration we will need the last 30 days of posts from the dataset for which we have location information.

In [10]:
posts_df=posts_df.dropna(subset=['location_id'])
posts_df.count()

                                                                                

20426102

In [82]:
max_date = posts_df.agg({"cts": "max"}).collect()[0][0]
d=max_date-dt.timedelta(days=30)
print('period_start:',d,'\nperiod_end:',max_date)



period_start: 2019-07-31 06:21:59 
period_end: 2019-08-30 06:21:59


                                                                                

In [14]:
last_month_df=posts_df.filter(posts_df.cts > d).sort('cts')
last_month_df.count()

                                                                                

270431

Now that the data has been reduced significantly, let's also add some location details by joining some attributes from the location data.

In [15]:
loc_summarized=locations_df.select('id','name','city','cd')

In [70]:
joined_df=last_month_df.join(loc_summarized,
              last_month_df.location_id== loc_summarized.id,
              how='inner')
joined_df.printSchema()

root
 |-- sid: integer (nullable = true)
 |-- sid_profile: integer (nullable = true)
 |-- post_id: string (nullable = true)
 |-- profile_id: integer (nullable = true)
 |-- location_id: integer (nullable = true)
 |-- cts: timestamp (nullable = true)
 |-- post_type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- numbr_likes: integer (nullable = true)
 |-- number_comments: integer (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- cd: string (nullable = true)



In [162]:
joined_df.persist()
joined_df.count()

                                                                                

211432

In [75]:
#Convert columns to Map
joined_df = joined_df.withColumn("location",create_map(
        lit("id"),col("id"),
        lit("name"),col("name"),
        lit("city"),col("city"),
        lit("country"),col("cd")
        )).drop("location_id","id",'name','city','cd')
joined_df.printSchema()

root
 |-- sid: integer (nullable = true)
 |-- sid_profile: integer (nullable = true)
 |-- post_id: string (nullable = true)
 |-- profile_id: integer (nullable = true)
 |-- cts: timestamp (nullable = true)
 |-- post_type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- numbr_likes: integer (nullable = true)
 |-- number_comments: integer (nullable = true)
 |-- location: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [76]:
joined_df.select('location').show(5,truncate=False)

+----------------------------------------------------------------------------------------------------+
|location                                                                                            |
+----------------------------------------------------------------------------------------------------+
|{id -> 45298, name -> KW Institute for Contemporary Art, city -> Berlin, Germany, country -> DE}    |
|{id -> 133107, name -> New York Deli RVA, city -> Richmond, Virginia, country -> US}                |
|{id -> 192201, name -> Lerkendal Stadion, city -> Trondheim, Norway, country -> NO}                 |
|{id -> 200309, name -> The BOILEROOM, city -> Guildford, country -> GB}                             |
|{id -> 259937, name -> Pembroke College Cambridge, city -> Cambridge, Cambridgeshire, country -> GB}|
+----------------------------------------------------------------------------------------------------+
only showing top 5 rows



In [77]:
joined_df.coalesce(1).write.json('data/last30_days_with_locs.json',header=True)
joined_df.unpersist()

                                                                                

DataFrame[sid: int, sid_profile: int, post_id: string, profile_id: int, cts: timestamp, post_type: int, description: string, numbr_likes: int, number_comments: int, location: map<string,string>]