In [16]:
import os
import sys
import socket
from timeit import default_timer as timer
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, from_json
import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType, IntegerType, StructType, StructField, FloatType, ArrayType

In [23]:
print('Running Time:')
datetime.fromtimestamp(1564019038783/1000) - datetime.fromtimestamp(1564015512745/1000)

Running Time:


datetime.timedelta(0, 3526, 38000)

# Config

In [None]:
try:
    spark
except NameError:
    if 'samuel' in socket.gethostname().lower():
        print('Create Local SparkSession')
        spark = SparkSession.builder.config(
        "spark.driver.host", "localhost").appName(
        "extract-data-from-geolocated-tweets").getOrCreate()
    else:
        print('Create Cluster SparkSession')
        spark = SparkSession.builder.appName(
        "extract-data-from-geolocated-tweets").getOrCreate()
    
# Local
print('Hostname:', socket.gethostname())
if  'samuel' in socket.gethostname().lower():
    path_to_tweets  = '../../data/tweets/tweets-with-geocoordinates-or-place/'
    path_to_locations = '../data/locations/'
# Cluster
else:
    path_to_tweets   = '/user/spf248/twitter/parsed/tweets/tweets-with-geocoordinates-or-place/'
    path_to_locations = '/user/spf248/twitter/data/locations/'

# Load Data

In [None]:
print('Import:')
start = timer()

df = spark.read.option(
'compression', 'bzip2').option(
"multiLine", "true").option(
"encoding", "UTF-8").option(
"mode", "FAILFAST").json(
path_to_tweets+'tweets-with-geocoordinates-or-place-from-decahose-partition-9-block-95.json.bz2')

schema = df.schema

# Getting Error Without Allowing For Multiline
df = spark.read.option(
'compression', 'bzip2').option(
"multiLine", "true").option(
"encoding", "UTF-8").option(
"mode", "FAILFAST").schema(schema).json(
path_to_tweets+'tweets-with-geocoordinates-or-place-from-decahose-partition-*-block-*.json.bz2')

end = timer()
print('Computing Time:', round(end - start), 'sec')

In [None]:
print('Repartition')
tweets = tweets.repartition(1000)

# Select Fields

In [4]:
def flatten(schema, prefix=None):
    fields = []
    for field in schema.fields:
        name = prefix + '.' + field.name if prefix else field.name
        dtype = field.dataType
        if isinstance(dtype, ArrayType):
            dtype = dtype.elementType

        if isinstance(dtype, StructType):
            fields += flatten(dtype, prefix=name)
        else:
            fields.append(name)

    return fields

In [5]:
tweets = tweets.select('place')

# Drop Place Values That Are Null
tweets = tweets.where(col("place").isNotNull())

# Flatten Nested Structure
tweets = tweets.select(flatten(tweets.schema))

In [6]:
print("Schema:", tweets.printSchema())

root
 |-- coordinates: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: double (containsNull = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- place_type: string (nullable = true)
 |-- url: string (nullable = true)

Schema: None


In [7]:
print('GROUPBY ID')

tweets = tweets.groupBy(tweets['id']).agg(
F.first(tweets['coordinates']).alias('coordinates'), 
F.first(tweets['type']).alias('type'),
F.first(tweets['country']).alias('country'),
F.first(tweets['country_code']).alias('country_code'),
F.first(tweets['full_name']).alias('full_name'),
F.first(tweets['name']).alias('name'),
F.first(tweets['place_type']).alias('place_type'),
F.first(tweets['url']).alias('url'),
F.count(tweets['id']).alias('n_obs'),
)

GROUPBY ID


In [7]:
print('Save')
start = timer()

tweets.write.mode("overwrite").parquet(path_to_locations+'pois')

end = timer()
print('Computing Time:', round(end - start), 'sec')

Save
Computing Time: 10 sec
