# README

first, in this notebook, we'll use Tim Dettmer's dump of 100gb of random twitter posts and group by user to see how many posts each user has. If they've not got many, then we'll use this to sample users from Twitter. If they have plenty of posts, then we'll just use Tim's data.

In [1]:
import os
import sys
import time

In [2]:
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.282.b08-1.el7_9.x86_64'  

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

In [4]:
print("Creating Spark session.")
configuation_properties = [
    ("spark.master","local[95]"),
    ("spark.ui.port","4050"),
    ("spark.executor.memory","750g"),
    ('spark.driver.memory',  '2000g'),
    ("spark.network.timeout",            "10000001"),
    ("spark.executor.heartbeatInterval", "10000000")
    #("spark.dynamicAllocation.enabled","true"),
    #("spark.shuffle.service.enabled","true"),
]

conf = SparkConf().setAll( configuation_properties )

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

# for logging temporarily
# sc.setLogLevel('DEBUG')
print("Spark session created.")

Creating Spark session.
Spark session created.


In [5]:
#path = '/projects/bdata/bdatasets/data/twitter_data_2020-12-03_0.json'
path = '/projects/bdata/bdatasets/data/twitter_data_*.json'

print(f'Loading data from {path}.')
start_time = time.monotonic()

data = spark.read.json(path)

print("Finished loading in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

Loading data from /projects/bdata/bdatasets/data/twitter_data_2020-12-03_0.json.
Finished loading in 1.780 minutes.


In [6]:
print(f'Loaded {data.count():,d} tweets')

Loaded 2,249,991 tweets


# after digging in the complex schema...

it seems we only care about two or three columns:

- text
- user.screen_name
- created_at

In [7]:
# chuck all the other columns
data = data.select(['user.screen_name', 'text'])

In [8]:
users = data.groupBy('screen_name').count().orderBy( F.col('count').desc() )

In [9]:
num_users = users.count()
print(f'Found {num_users:,d} unique users.')

Found 182,790 unique users.


In [11]:
# sample users and write to file
start_time = time.monotonic()

frac_to_sample = 10000 / num_users # want 8,000 users, let's get 10,000 just to be safe
sample = users.sample(withReplacement=False, fraction=frac_to_sample, seed=123456)
sample.coalesce(1).write.csv( 'random_users_sample', mode='overwrite', header=False)

print("Finished computing and writing output in {:5.3f} minutes.".format( (time.monotonic()-start_time)/60 ))

Finished computing and writing output in 0.070 minutes.


In [12]:
users = users.filter( F.col('count') >= F.lit(10) )

In [13]:
print(f'{users.count():,d} users have at least 10 tweets.')

55,387 users have at least 10 tweets.
