In [1]:
import pandas as pd
import numpy as np
import json
import os
import glob
import multiprocessing as mp
from time import time
import socket
from timeit import default_timer as timer


import warnings
warnings.filterwarnings('ignore')

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,desc,row_number,col,year,month,dayofmonth,dayofweek,to_timestamp,size,isnan,when,count,col,count,lit,sum
import pyspark.sql.functions as F
from pyspark.sql.types import *
from py4j.java_gateway import java_import
from functools import reduce
from pyspark.sql import DataFrame
from pyspark import SparkContext

# 1. Initialisation

In [3]:
memory = '10g'
pyspark_submit_args = ' --driver-memory ' + memory + ' pyspark-shell'
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [4]:
try:
    spark
except NameError:
    print('Create Local SparkSession')
    spark=SparkSession.builder.config("spark.driver.host", "localhost").appName("extract-timelines").getOrCreate()
    
# IgnoreCorruptFiles
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

sc = spark.sparkContext

Create Local SparkSession


In [5]:
# Paths to data
path_to_data = "../data/"
path_to_timeline=os.path.join(path_to_data,'timelines/API/IDF-clean/')
path_to_output_timelines=os.path.join(path_to_data,'chunks','IDF_departments')

# Update data
path_to_updates = os.path.join(path_to_data,'timelines','API','IDF-updates')
path_to_output_updates=os.path.join(path_to_data,'chunks','IDF-updates')

In [6]:
updating=True

In [7]:
if updating==False:
    path_to_input=path_to_timeline
    path_to_output=path_to_output_timelines
    path_to_users=os.path.join(path_to_data,'timelines','API','all_users_ids')

else:
    path_to_input=path_to_updates
    path_to_output=path_to_output_updates
    path_to_users = os.path.join(path_to_data,'timelines','API','most_recent_id')

In [8]:
print('List files to be processed...')

fs=spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status=fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path_to_input))

paths=[file.getPath().toString() for file in list_status]
paths=[path.replace('hdfs://dumbo','') for path in paths if 'json' in path]
np.random.seed(0)
paths=np.random.permutation(sorted(paths))

print('# Files:', len(paths))

List files to be processed...
# Files: 106


In [9]:
n_chunks=10
print('# Chunks:', n_chunks)
paths_chunks=np.array_split(paths, n_chunks)

# Chunks: 10


# 2. Extract timelines

In [10]:
def extract_chunk(i_chunk,paths_chunk) :
    
    if updating==True:
        df=spark.read.option(
            "compression","bzip2").json(list(paths_chunk))
    
    
    else:
        df = spark.read.json(list(paths_chunk))
    
    df=df.repartition(1000)

    df=df.select(
            'id_str',
            'created_at',
            'full_text',
            'lang',
            'user_id',
            'user_name',
            'country_code',
            'city',
            'bounding_box.coordinates',
            'bounding_box.type'
            )

    df = df.toDF(*[
            'id_str',
            'created_at',
            'full_text',
            'lang',
            'user_id',
            'user_name',
            'country_code',
            'city',
            'coordinates',
            'bounding_box_type'
            ])

    df = df.withColumn('created_at', to_timestamp('created_at',"EEE MMM dd HH:mm:ss ZZZZZ yyyy"))
    df = df.filter(col('created_at') > '2019-07-01') 
   
    df = df.dropDuplicates()
    
    most_recent_id = df.orderBy('created_at', ascending = False).coalesce(1).dropDuplicates(subset = ['user_id'])

    df.write.mode("overwrite").parquet(os.path.join(path_to_output,str(i_chunk)))
    
    most_recent_id.write.mode("overwrite").parquet(os.path.join(path_to_users,str(i_chunk)))
    

In [11]:
for i_chunk,paths_chunk in enumerate(paths_chunks):
    
    print('EXTRACT CHUNK', i_chunk)
    start = timer()
    
    extract_chunk(i_chunk,paths_chunk)


    end = timer()
    print('TIME:', round(end - start), 'SEC')
    

EXTRACT CHUNK 0
TIME: 181 SEC
EXTRACT CHUNK 1
TIME: 123 SEC
EXTRACT CHUNK 2
TIME: 122 SEC
EXTRACT CHUNK 3
TIME: 129 SEC
EXTRACT CHUNK 4
TIME: 127 SEC
EXTRACT CHUNK 5
TIME: 134 SEC
EXTRACT CHUNK 6
TIME: 123 SEC
EXTRACT CHUNK 7
TIME: 126 SEC
EXTRACT CHUNK 8
TIME: 132 SEC
EXTRACT CHUNK 9
TIME: 138 SEC
