In [None]:
from pyspark import *
from pyspark.sql import SQLContext
import json

In [None]:
# if true Enable Arrow-based columnar data transfers for speed.
spark.conf.set("spark.sql.execution.arrow.enabled", "false")

In [None]:
# create Spark context and sql context.
#sc = SparkContext()
sqlContext = SQLContext(sc)

In [None]:
import numpy as np
import pandas as pd
from pandas.io.json import json_normalize #package for flattening json in pandas df

In [None]:
# load json object testData
with open("data/testData.json", encoding="utf-8") as f:
    d = json.load(f)

In [None]:
# Creates a dataframe; each row is a tweet, each column is a tweet attribute + tweet ID
pdf = json_normalize(data=d['tweets'], record_path='tweet',
                            meta=['tweet_id'])
pdf.head(5)

In [None]:
#_____________Pandas Text Cleaning__________________
# Convert to lowercase
pdf['tweet_text'] = pdf['tweet_text'].apply(lambda x: " ".join(x.lower() for x in x.split()))
pdf['tweet_text'].head()

In [None]:
rgx = '[.,]'  # remove , or .
pdf['tweet_text'] = pdf['tweet_text'].str.replace(rgx, '')
pdf['tweet_text'].head()

In [None]:
# Create count numbers of hastags used in each tweet.
pdf['hastags'] = pdf['tweet_text'].apply(lambda x: len([x for x in x.split() if x.startswith('#')]))
pdf[['tweet_text','hastags']].head()
pdf['hastags'].max()
pdf["hastags"].sum()

In [None]:
# Filter by keyword, we can have it search many keywords such as [roadblocking, landslide, poweroutage...]
#Which ever we find most suitable.
keyword = "help"
pdf['keyword'] = pdf['tweet_text'].apply(lambda x: len([x for x in x.split() if x.startswith(keyword)]))
pdf[['keyword']].max()

In [None]:
# Split latitude and longitude.
pdf["lat"] = pdf["tweet_location"].apply(lambda x: x[0])
pdf["long"] = pdf["tweet_location"].apply(lambda x: x[1])
pdf

In [None]:
# Remove tweet that does not cointain any of the keywords.
#keywordDf = pdf[(pdf['keyword'] >= 1)]
#keywordDf.head()

In [None]:
# Create a Spark DataFrame from a Pandas DataFrame
df = sqlContext.createDataFrame(pdf)

In [None]:
df

In [None]:
df.select("tweet_id","tweet_date","lat","long","tweet_text").show()

In [None]:
# use latitude and longitide as x and y features with k-means
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.select("tweet_id","features","tweet_text").show()

In [None]:
# run K-means over longitude and latitude, with k amount of target clusters.
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=16, seed=1) # 16 clusters
model = kmeans.fit(new_df.select('features'))

In [None]:
# show tweets with their associated cluster("prediction") 
transformed = model.transform(new_df)
transformed.select("tweet_id","prediction","lat","long","tweet_text").show()

In [None]:
# Shows the cluster centers. ("Centroids")
centers = model.clusterCenters()
print("Cluster Centers: ")
for idx,center in enumerate(centers):
    print(idx,center)

In [None]:
transformed.printSchema()

In [None]:
# Combines all tweets under its associated k-means cluster class.
import pyspark.sql.functions as F
documents = transformed.groupBy('prediction')\
  .agg(F.collect_list('tweet_text').alias("cluster_text")).orderBy("prediction")

In [None]:
documents.show()

In [None]:
documents.select("cluster_text").first()

In [None]:
# Combine each list of tweets to a string.
documents = documents.withColumn('cluster_text', F.concat_ws(',', 'cluster_text'))
documents.select("cluster_text").first()

In [None]:
# Convert the Spark DataFrame back to a Pandas DataFrame.
result_pdf = transformed.select("*").toPandas()

In [None]:
result_pdf.head(5)

In [None]:
# Write pandas dataframe to jsone file
with open('data/testDataCluster.json', 'w') as f:
    f.write(result_pdf.to_json(orient='records'))

In [None]:
sc.stop()