# Project Title

-description-

## Table of Content

1. Importing modules

2. Data processing

...

## 1. Importing modules

In [1]:
import csv
from zipfile import ZipFile
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

## 2. Data processing

In [2]:
#Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [3]:
spark = init_spark()

In [6]:
spark = init_spark()

lines = spark.sparkContext.textFile('data/influencers.txt')

# get category and username index
headers = lines.take(2)
header = headers[0]
category_index = header.split("\t").index("Category")
username_index = header.split("\t").index("Username")

# filter travel influencers
lines = lines.filter(lambda line: line not in headers)
lines = lines.map(lambda line: line.split("\t"))
travel_influencers = lines.filter(lambda line: line[category_index] == 'travel')
# get all travel influencers IG username
travel_usernames = travel_influencers.map(lambda line: line[username_index])

print(travel_influencers.count())
print(travel_usernames.count())
print(travel_usernames.collect()[3])

4210
4210
wake.up.matt


In [12]:
# TODO: reduce time complexity
with ZipFile('data/post-metadata/fix.zip', 'r') as zipObject:
    names = zipObject.namelist()
    for file_name in names:
        if any(i in file_name for i in travel_usernames.collect()):
            # Extract a travel influencers post metadata from zip
            zipObject.extract(file_name, 'data/post-metadata')
            print('All the python files are extracted')

['zip/', 'zip/test1.txt', 'zip/test2.txt', 'zip/test3.txt']
zip/test1.txt
zip/test2.txt


# Data Processing, Reading Data Files, and Convert them to RDD

### All the following cells need to be executed unless the cell is specify as OPTIONAL

In [64]:
#folder where the .info files are located. 
# MODIFY THE SOURCE DIRECTORY ACCORDINGLY
source_directory = '../data_instagram/deleteme/*.info'
source_directory = './test_01/*.info'

In [92]:
#preprocess functions need to extract required fields
def extract_counts(row, field):
    if field not in row:
        return 0
    if row[field] is None:
        return 0
    if 'count' not in row[field] or row[field]['count'] is None:
        return 0
    return row[field]['count']

def likes(row):
    return extract_counts(row, 'edge_media_preview_like')

def comments_count(row):
    return extract_counts(row, 'edge_media_to_parent_comment')

def extract_nodes_from_edges(row, field, secondary_fields):
    result = []
    if field not in row or row[field] is None \
    or 'edges' not in row[field] or row[field]['edges'] is None:
        return []

    for edge in row[field]['edges']:
        if 'node' in edge and edge['node']:
            no_error = True
            temp = edge['node']
            for f in secondary_fields:
                if f in temp and temp[f]:
                    temp = temp[f]
                else:
                    no_error = False
                    
            if not no_error:
                result.append(temp)
 
    return result
    
def extract_tagged_users_id(row):
    #edge_media_to_tagged_user.edges.[i].node.user.id
    return extract_nodes_from_edges(row, 'edge_media_to_tagged_user', ['user', 'id'])
    '''
    result = []
    if 'edge_media_to_tagged_user' not in row or row['edge_media_to_tagged_user'] is None \
    or 'edges' not in row['edge_media_to_tagged_user'] or row['edge_media_to_tagged_user']['edges'] is None:
        return []

    for edge in row['edge_media_to_tagged_user']['edges']:
        if 'node' in edge and edge['node'] and 'user' in edge['node'] and edge['node']['user'] and 'id' in edge['node']['user']:
            result.append(edge['node']['user']['id'])
    return result
    '''

def extract_commenters_id(row):
    #edge_media_to_parent_comment.edges.[i].node.owner.id
    return extract_nodes_from_edges(row, 'edge_media_to_parent_comment', ['owner', 'id'])
    '''
    result = []
    if 'edge_media_to_parent_comment' not in row or row['edge_media_to_parent_comment'] is None \
    or 'edges' not in row['edge_media_to_parent_comment'] or row['edge_media_to_parent_comment']['edges'] is None:
        return []

    for edge in row['edge_media_to_parent_comment']['edges']:
        if 'node' in edge and edge['node'] and 'owner' in edge['node'] and edge['node']['owner'] and 'id' in edge['node']['owner']:
            result.append(edge['node']['owner']['id'])
    return result
    '''

def extract_text_from_caption(row):
    #edge_media_to_caption.edges.node.text
    result = []
    if 'edge_media_to_caption' not in row or row['edge_media_to_caption'] is None \
    or 'edges' not in row['edge_media_to_caption'] or row['edge_media_to_caption']['edges'] is None:
        return []
    
    for edge in row['edge_media_to_caption']['edges']:
        if 'node' in edge and edge['node'] and 'text' in edge['node']:
            result.append(edge['node']['text'])
    return result

def extract_location(row):
    result = {
        'location_name': '',
        'location_id': ''
    }
    if 'location' in row and row['location']:
        if 'name' in row['location']:
            result['location_name'] = row['location']['name']
        if 'id' in row['location']:
            result['location_id']   = row['location']['id']
        
    return result
    
def extract_post_owner_username(row):
    if 'owner' not in row or row['owner'] is None:
        return ''
    
    if 'username' not in row['owner'] or row['owner']['username'] is None:
        return ''

    return row['owner']['username']
    
#This function return an RDD where each row is a json doc
def create_post_as_json(row):
    post_id = row['id'] or ''
    location = extract_location(row)
    owner_username = extract_post_owner_username(row)
    texts = extract_text_from_caption(row)
    count_likes = likes(row)
    tagged_users_id = extract_tagged_users_id(row)
    commenters_id = extract_commenters_id(row)    
    
    return {
        'post_id': post_id,
        'location_name' : location['location_name'],
        'location_id' : location['location_id'],
        'count_likes': count_likes,
        'owner_username': owner_username,
        'captions': texts,
        'tagged_users_id': tagged_users_id,
        'commenters_id': commenters_id
    }

#when exporting the data to CSV, it doesn't allow arrays, so the they needs to be converted into strings
def flatten_json_lists(row):
    row['captions'] = '. '.join(row['captions'])
    row['tagged_users_id'] =  ', '.join(row['tagged_users_id'])
    row['commenters_id'] =  ', '.join(row['commenters_id'])
    return row

#convert a json doc into tuples
def convert_json_to_tuple(row):
    post_id = row['post_id']
    location_name = row['location_name']
    location_id = row['location_id']
    count_likes = row['count_likes']
    owner_username = row['owner_username']
    texts = row['captions']
    tagged_users_id = row['tagged_users_id']
    commenters_id = row['commenters_id']
    return (post_id, location_name, location_id, 
            count_likes, owner_username, texts, tagged_users_id, commenters_id)
    

def remove_carry_returns(row):
    row['captions'] = row['captions'].replace('\r', '').replace('\n', ' ')
    return row
    
#same as the previous function, but the return type is a tuple (NOT USED)
def create_post_as_tuple(row):
    post_id = row['id'] or ''
    location = extract_location(row)
    owner_username = extract_post_owner_username(row)
    texts = '\n'.join(extract_text_from_caption(row))
    count_likes = likes(row)
    tagged_users_id = ', '.join(extract_tagged_users_id(row))
    commenters_id = ', '.join(extract_commenters_id(row))
    return (post_id, location['location_name'], location['location_id'], 
            count_likes, owner_username, texts, tagged_users_id, commenters_id)


In [65]:
#Read data from source directory
df = spark.read.json(source_directory)
rdd =  df.rdd

In [93]:
#transform data to the needed format
clean_data = rdd.map(lambda r: create_post_as_json(r)).\
    map(lambda r: flatten_json_lists(r)).\
    map(lambda r: remove_carry_returns(r)).\
    map(lambda r: convert_json_to_tuple(r))


In [94]:
#OPTIONAL, run this if you want to see the data printed
df = clean_data.toDF(schema)

In [95]:
#OPTIONAL, run this if want to see data printed. You need to run the previous cell  for this one to work
df.show()

+-------------------+--------------------+----------------+-----------+--------------------+--------------------+---------------+-------------+
|            post_id|       location_name|     location_id|count_likes|      owner_username|            captions|tagged_users_id|commenters_id|
+-------------------+--------------------+----------------+-----------+--------------------+--------------------+---------------+-------------+
|1991526353976830561| Ristorante  Il Moro|       243270193|       1269|     abakumovanastia|[ P E S C E ] Как...|               |             |
|1825852844165346062|Half Moon Bay, Ca...|       213762097|       4851|        abekislevitz|This month marks ...|               |             |
|1749451773011353261|     Capitoline Hill|       221222985|       1330|     abakumovanastia|[ T E R R A Z Z A...|               |             |
|1668226920279477602|Terrazza Della Ri...| 153841095220974|       1297|     abakumovanastia|[ L E G G E N D A...|               |       

In [66]:

schema = ['post_id', 'location_name', 'location_id', 'count_likes', 'owner_username', 'captions', 'tagged_users_id', 'commenters_id']


In [91]:
#save data into csv files
#rdd.map(lambda r: create_post_as_tuple(r)).toDF(schema).write.format("com.databricks.spark.csv").save("csv_formated_data", header="true")

clean_data.toDF(schema).write.format("com.databricks.spark.csv").save("csv_formated_data", header="true")
