In [1]:
import pyspark
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [2]:
sc.stop()

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('DataAnalysisOnElonMusk').getOrCreate()

In [4]:
import os
import re
from datetime import date, datetime
import pandas as pd

In [5]:
file_path_name = 'elonmusk.csv'

In [6]:
def open_file(file_path_name):
    return pd.read_csv(file_path_name, index_col=[0])

In [7]:
print(open_file(file_path_name).head())

                         conversation_id     created_at        date      time  \
id                                                                              
1282939902531796993  1282933079431151618  1594711683000  2020-07-14  07:28:03   
1282844872571904000  1282801938111791104  1594689026000  2020-07-14  01:10:26   
1282805559834492929  1282758532312584193  1594679653000  2020-07-13  22:34:13   
1282800187308572672  1282671714657157120  1594678372000  2020-07-13  22:12:52   
1282800078000803840  1282739486816964615  1594678346000  2020-07-13  22:12:26   

                    timezone   user_id  username       name  place  \
id                                                                   
1282939902531796993      UTC  44196397  elonmusk  Elon Musk    NaN   
1282844872571904000      UTC  44196397  elonmusk  Elon Musk    NaN   
1282805559834492929      UTC  44196397  elonmusk  Elon Musk    NaN   
1282800187308572672      UTC  44196397  elonmusk  Elon Musk    NaN   
128280007800

In [8]:
def clean_dataframe(df, columns_to_drop):
    df = drop_redundant_columns(df, columns_to_drop)
    return df
def transform_dataframe(df):
    df = drop_columns_with_constant_values(df)
    add_mentions_count(df)
    add_weekday(df)
    add_reply_to_count(df)
    add_photos_count(df)
    convert_to_datetime(df)
    extract_hour_minute(df)
    df = drop_redundant_columns(df, ['photos', 'date', 'mentions', 'reply_to', 'reply_to_count'])
    return df

In [9]:
columns_to_drop = ['hashtags', 'cashtags', 'link', 'quote_url', 'urls', 'created_at']
def drop_redundant_columns(df, columns_to_drop):
    return df.drop(columns=columns_to_drop, axis=0)
def drop_columns_with_constant_values(df):
    return df.drop(columns=list(df.columns[df.nunique() <= 1]))
def add_mentions_count(df):
    new_values = []
    for i, content in df['mentions'].items():
        new_values.append(int(content.count("'") / 2))
    df['mentions_count'] = new_values
    return df
def add_weekday(df):
    weekday = []
    for i, content in df['date'].items():
        year, month, day = map(int, content.split('-'))
        d = date(year, month, day)
        weekday.append(d.weekday())
    df['weekday'] = weekday
    return df
def add_reply_to_count(df):
    reply_to_count_values = []
    for i, content in df['reply_to'].items():
        reply_to_count_values.append((int(content.count("{")) - 1))
    df['reply_to_count'] = reply_to_count_values
    return df
def add_photos_count(df):
    new_values = []
    for i, content in df['photos'].items():
        new_values.append(int(content.count("https")))
    df['photos_count'] = new_values
    return df
def convert_to_datetime(df):
    df['datetime'] = (df['date'] + " " + df['time']).astype('string')
    return df

In [10]:
def extract_hour_minute(df):
    year_col = []
    month_col = []
    hour_col = []
    minute_col = []
    for i, content in df['datetime'].items():
        t1 = datetime.strptime(content, '%Y-%m-%d %H:%M:%S')
        year_col.append(t1.year)
        month_col.append(t1.month)
        hour_col.append(t1.hour)
        minute_col.append(t1.minute)
    df['year'] = year_col
    df['month'] = month_col
    df['hour'] = hour_col
    df['minute'] = minute_col
    return df

In [11]:
df = open_file(file_path_name)
new_df = clean_dataframe(df, columns_to_drop)
new_df = transform_dataframe(new_df)

In [12]:
df['tweet'] = df['tweet'].str.lower()
data = []
for i,j in zip(new_df,new_df.count()):
    data.append((i,str(j)))
rdd = spark.sparkContext.parallelize(data)
resultCount = rdd.collect()
print(resultCount)

[('conversation_id', '9286'), ('time', '9286'), ('tweet', '9286'), ('replies_count', '9286'), ('retweets_count', '9286'), ('likes_count', '9286'), ('video', '9286'), ('mentions_count', '9286'), ('weekday', '9286'), ('photos_count', '9286'), ('datetime', '9286'), ('year', '9286'), ('month', '9286'), ('hour', '9286'), ('minute', '9286')]


In [13]:
#Dropping duplicates from previous count data
new_df.drop_duplicates(subset=['tweet'], keep='first', inplace=True)
#print(new_df.shape)
shape = spark.sparkContext.parallelize([new_df.shape]).collect()
print(shape)
data2 = []
for i,j in zip(new_df,new_df.count()):
    data2.append((i,str(j)))
rdd = spark.sparkContext.parallelize(data2)
resultCount2 = rdd.collect()
print(resultCount2)

[(8356, 15)]
[('conversation_id', '8356'), ('time', '8356'), ('tweet', '8356'), ('replies_count', '8356'), ('retweets_count', '8356'), ('likes_count', '8356'), ('video', '8356'), ('mentions_count', '8356'), ('weekday', '8356'), ('photos_count', '8356'), ('datetime', '8356'), ('year', '8356'), ('month', '8356'), ('hour', '8356'), ('minute', '8356')]


In [14]:
count = new_df['tweet'].str.split().str.len()
count.index = count.index.astype(str) + ' words:'
count.sort_index(inplace=True)
def word_count(df):
    words_count = []
    for i, content in df['tweet'].items():
        new_values =[]
        new_values = content.split()
        words_count.append(len(new_values))
    df['word_count'] = words_count
    return df
new_df = word_count(new_df)
print("Total number of words: ", count.sum(), "words")

Total number of words:  112254 words


In [15]:
print("Average number of words per tweet: ", round(count.mean(),2), "words")
print("Max number of words per tweet: ", count.max(), "words")
print("Min number of words per tweet: ", count.min(), "words")

Average number of words per tweet:  13.43 words
Max number of words per tweet:  58 words
Min number of words per tweet:  1 words


In [16]:
new_df['tweet_length'] = new_df['tweet'].str.len()
print("Total length of a dataset: ", new_df.tweet_length.sum(), "characters")
print("Average length of a tweet: ", round(new_df.tweet_length.mean(),0), "characters")

Total length of a dataset:  705735 characters
Average length of a tweet:  84.0 characters
