In [None]:
from pyspark.sql.session import SparkSession
path = '/mnt/training/twitter/firehose/2018/01/08/18/twitterstream-1-2018-01-08-18-48-00-bcf3d615-9c04-44ec-aac9-25f966490aa4'
# instantiate Spark
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(path)

Displaying the schema.

In [None]:
df.printSchema()

Counting the records in the file and saving the result to `dfCount`.

In [None]:
dfCount = df.count()

Create a schema for the JSON data to extract just the information that is needed

In [None]:
from pyspark.sql.types import StructField, StructType, ArrayType, StringType, IntegerType, LongType

fullTweetSchema = StructType([
  StructField("id", LongType(), True),
  StructField("user", StructType([
    StructField("id", LongType(), True),
    StructField("screen_name", StringType(), True),
    StructField("location", StringType(), True),
    StructField("friends_count", IntegerType(), True),
    StructField("followers_count", IntegerType(), True),
    StructField("description", StringType(), True)
  ]), True),
  StructField("entities", StructType([
    StructField("hashtags", ArrayType(
      StructType([
        StructField("text", StringType(), True)
      ]),
    ), True),
    StructField("urls", ArrayType(
      StructType([
        StructField("url", StringType(), True),
        StructField("expanded_url", StringType(), True),
        StructField("display_url", StringType(), True)
      ]),
    ), True)
  ]), True),
  StructField("lang", StringType(), True),
  StructField("text", StringType(), True),
  StructField("created_at", StringType(), True)
])

fullTweetDF = spark.read.schema(fullTweetSchema).json(path)
fullTweetDF.printSchema()
display(fullTweetDF)

## Creating the Tables

Apply the schema to create tables for relational data model.

### Filtering Nulls

The Twitter data contains both deletions and tweets.  This is why some records appear as null values. Creating a DataFramed called `fullTweetFilteredDF` that filters out the null values.

In [None]:
from pyspark.sql.functions import col

fullTweetFilteredDF = (fullTweetDF
  .filter(col("id").isNotNull())
)

display(fullTweetFilteredDF)

Twitter uses a non-standard timestamp format that Spark doesn't recognize. Currently the `created_at` column is formatted as a string. Parsing the timestamp column using `unix_timestamp`, and cast the result as `TimestampType`. The timestamp format is `EEE MMM dd HH:mm:ss ZZZZZ yyyy`.

In [None]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.types import TimestampType

timestampFormat = "EEE MMM dd HH:mm:ss ZZZZZ yyyy"

tweetDF = fullTweetFilteredDF.select(col("id").alias("tweetID"), 
  col("user.id").alias("userID"), 
  col("lang").alias("language"),
  col("text"),
  unix_timestamp("created_at", timestampFormat).cast(TimestampType()).alias("createdAt")
)

display(tweetDF)

In [None]:
accountDF = fullTweetFilteredDF.select(col("user.id").alias("userID"), 
    col("user.screen_name").alias("screenName"),
    col("user.location"),
    col("user.friends_count").alias("friendsCount"),
    col("user.followers_count").alias("followersCount"),
    col("user.description")
)

display(accountDF)

In [None]:
from pyspark.sql.functions import explode

hashtagDF = fullTweetFilteredDF.select(col("id").alias("tweetID"), 
    explode(col("entities.hashtags.text")).alias("hashtag")
)

hashtagDF.show()

In [None]:
urlDF = (fullTweetFilteredDF.select(col("id").alias("tweetID"), 
    explode(col("entities.urls")).alias("urls"))
  .select(
    col("tweetID"),
    col("urls.url").alias("URL"),
    col("urls.display_url").alias("displayURL"),
    col("urls.expanded_url").alias("expandedURL"))
)

urlDF.show()


## Loading the Results

Saving the DataFrames in Parquet format.

In [None]:
accountDF.write.mode("overwrite").parquet("/tmp/account.parquet")
tweetDF.write.mode("overwrite").parquet("/tmp/tweet.parquet")
hashtagDF.write.mode("overwrite").parquet("/tmp/hashtag.parquet")
urlDF.write.mode("overwrite").parquet("/tmp/url.parquet")