In [10]:
import datetime
from dateutil.parser import parse

import sys
assert sys.version_info >= (3, 5) # make sure we have Python 3.5+

from pyspark.sql import SparkSession, functions, types
spark = SparkSession.builder.appName('example code').getOrCreate()
assert spark.version >= '2.3' # make sure we have Spark 2.3+
# spark.sparkContext.setLogLevel('WARN')
# sc = spark.sparkContext

# # Reading the json data.
# df = spark.read.json('./twitter-2018-10-01/2018/10/01/01/55.json.bz2')

# Reading the json data.
df = spark.read.json('./data/55.json.bz2')

# list(df.columns)
'''
Columns in the dataframe df.
['contributors',
 'coordinates',
 'created_at',
 'delete',
 'display_text_range',
 'entities',
 'extended_entities',
 'extended_tweet',
 'favorite_count',
 'favorited',
 'filter_level',
 'geo',
 'id',
 'id_str',
 'in_reply_to_screen_name',
 'in_reply_to_status_id',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id',
 'in_reply_to_user_id_str',
 'is_quote_status',
 'lang',
 'place',
 'possibly_sensitive',
 'quote_count',
 'quoted_status',
 'quoted_status_id',
 'quoted_status_id_str',
 'quoted_status_permalink',
 'reply_count',
 'retweet_count',
 'retweeted',
 'retweeted_status',
 'source',
 'text',
 'timestamp_ms',
 'truncated',
 'user',
 'withheld_in_countries']
'''

# Keeping only the required fields.
df_fields_filtered = df.select(df['user'], df['timestamp_ms'], df['lang'], df['text'], df['favorite_count'], df['retweet_count'], df['possibly_sensitive'], df['delete'])

# Keeping only the tweets in the english language.
df_lang_fields_filtered = df_fields_filtered.filter(df['lang'] == 'en')

# Dropping the language column.
df_lang_fields_filtered = df_lang_fields_filtered.drop('lang')

# Filtering the records whose timestamp_ms field is null.
df_time_lang_fields_filtered = df_lang_fields_filtered.filter(df_lang_fields_filtered['timestamp_ms'].isNotNull())

'''
Structure of the user column: contributors_enabled, created_at, default_profile, 
    default_profile_image, description, favourites_count, follow_request_sent, followers_count, 
    following, friends_count, geo_enabled, id, id_str, is_translator, lang, listed_count, 
    location, name, notifications, profile_background_color, profile_background_image_url, 
    profile_background_image_url_https, profile_background_tile, profile_banner_url, 
    profile_image_url, profile_image_url_https, profile_link_color, profile_sidebar_border_color, 
    profile_sidebar_fill_color, profile_text_color, profile_use_background_image, protected, 
    screen_name, statuses_count, time_zone, translator_type, url, utc_offset, verified.
'''

# Extracting the screen_name and followers_count from the user column and deleting the user column.
df_user_time_lang_fields_filtered = df_time_lang_fields_filtered.withColumn('screen_name', df_time_lang_fields_filtered['user']['screen_name'])
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.withColumn('followers_count', df_user_time_lang_fields_filtered['user']['followers_count'])
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.drop('user')

@functions.udf(returnType = types.DateType())
def convert_timestamp_to_date(timestamp_in_ms):
    date = datetime.datetime.fromtimestamp(float(timestamp_in_ms)/1000).strftime('%Y-%m-%d')
    return parse(date)

@functions.udf(returnType = types.DateType())
def convert_timestamp_to_time(timestamp_in_ms):
    time = datetime.datetime.fromtimestamp(float(timestamp_in_ms)/1000).strftime('%H:%M:%S.%f')
    return parse(time)

# Converting the timestamp in milliseconds to date.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.withColumn('Date', convert_timestamp_to_date(df_user_time_lang_fields_filtered['timestamp_ms']))

# Converting the timestamp in milliseconds to time.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.withColumn('Time', convert_timestamp_to_time(df_user_time_lang_fields_filtered['timestamp_ms']))

# Dropping the timestamp_ms column.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.drop('timestamp_ms')

# Reordering the columns.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.select('Date', 'screen_name', 'followers_count', 'Time', 'text', 'favorite_count', 'retweet_count', 'possibly_sensitive', 'delete')

# # To check the datatypes of the columns.
# print(df_time_user_lang_fields_filtered.dtypes)
'''
[('Date', 'date'), ('screen_name', 'string'), ('followers_count', 'bigint'),
('Time', 'date'), ('text', 'string'), ('favorite_count', 'bigint'),
('retweet_count', 'bigint'), ('possibly_sensitive', 'boolean'),
('delete',
'struct<status:struct<id:bigint,id_str:string,user_id:bigint,user_id_str:string>,timestamp_ms:string>')]
'''

df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.withColumn('possibly_sensitive', df_user_time_lang_fields_filtered['possibly_sensitive'].cast(types.StringType()))

# # To check the datatypes of the columns.
# print(df_user_time_lang_fields_filtered.dtypes)
'''
[('Date', 'date'), ('screen_name', 'string'), ('followers_count', 'bigint'),
('Time', 'date'), ('text', 'string'), ('favorite_count', 'bigint'),
('retweet_count', 'bigint'), ('possibly_sensitive', 'string'),
('delete',
'struct<status:struct<id:bigint,id_str:string,user_id:bigint,user_id_str:string>,timestamp_ms:string>')]
'''

# https://stackoverflow.com/questions/45065636/pyspark-how-to-fillna-values-in-dataframe-for-specific-columns
# https://stackoverflow.com/questions/42312042/how-to-replace-all-null-values-of-a-dataframe-in-pyspark
null_replace_dictionary = { 'screen_name':'null', 'text':'null', 'possibly_sensitive':'null',
                            'followers_count':0, 'favorite_count':0, 'retweet_count':0}

# Replacing the null values in the dataframe so that when we use collect_list we can retain the mapping.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.na.fill(null_replace_dictionary)

# Grouping the tweets by date and aggregating the rest of the columns with collect_list.
df_user_time_lang_fields_filtered = df_user_time_lang_fields_filtered.groupby('Date').agg(
                                    functions.collect_list('screen_name').alias('screen_name'),                                        
                                    functions.collect_list('followers_count').alias('followers_count'),
                                    functions.collect_list('Time').alias('Time'),
                                    functions.collect_list('text').alias('text'),
                                    functions.collect_list('favorite_count').alias('favorite_count'),
                                    functions.collect_list('retweet_count').alias('retweet_count'),
                                    functions.collect_list('possibly_sensitive').alias('possibly_sensitive'),
                                    functions.collect_list('delete').alias('delete')
                                    )


# To check the datatypes of the columns.
# print(df_user_time_lang_fields_filtered.dtypes)
'''
[('Date', 'date'),
 ('screen_name', 'array<string>'),
 ('followers_count', 'array<bigint>'),
 ('Time', 'array<date>'),
 ('text', 'array<string>'),
 ('favorite_count', 'array<bigint>'),
 ('retweet_count', 'array<bigint>'),
 ('possibly_sensitive', 'array<boolean>'),
 ('delete',
  'array<struct<status:struct<id:bigint,id_str:string,user_id:bigint,user_id_str:string>,timestamp_ms:string>>')]
'''

# To check the size of the columns.
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['screen_name'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['followers_count'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['Time'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['text'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['favorite_count'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['retweet_count'])).show()
# df_user_time_lang_fields_filtered.select(functions.size(df_user_time_lang_fields_filtered['possibly_sensitive'])).show()

# # To check the values of the columns.
# df_user_time_lang_fields_filtered.select(df_user_time_lang_fields_filtered['Date']).distinct().show()

df_user_time_lang_fields_filtered.show()


# list(df.toPandas().columns.values)
# df = df.toPandas()
# df.head(50).to_csv('sdvsdvdsvdsv.csv', encoding='utf-8', index=False)

# df.toPandas().head(5)

# # add more functions as necessary

# def main(inputs, output):
#     # main logic starts here

# if __name__ == '__main__':
#     inputs = sys.argv[1]
#     output = sys.argv[2]
#     main(inputs, output)

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|      Date|         screen_name|     followers_count|                Time|                text|      favorite_count|       retweet_count|  possibly_sensitive|delete|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+
|2018-10-01|[oizumix, chabell...|[32, 467, 389, 22...|[2019-03-03, 2019...|[Mon, 01 Oct 2018...|[0, 0, 0, 0, 0, 0...|[0, 0, 0, 0, 0, 0...|[null, null, fals...|    []|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------+

