In [1]:
sc

### Importing libraries 

In [2]:
# import SparkSession library 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('data_processing').getOrCreate()

In [3]:
# Load the libraries
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql.types import StructType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer




In [4]:
from pyspark.sql.functions import concat, substring, from_unixtime, unix_timestamp, lit, to_timestamp, to_date

### Loading the original ProjectTweets.csv dataset before doing the analysis on hive.

In [12]:
Schema = StructType([
    StructField("index", IntegerType(), True), 
    StructField("ids", IntegerType(), True), 
    StructField("date", StringType(), True), 
    StructField("falg", StringType(), True), 
    StructField("user", StringType(), True),
     StructField("text", StringType(), True)])
    

data= spark.read.load('hdfs://localhost:9000/user1/ProjectTweets.csv', format="csv", header="true", sep=',', schema=Schema)
data.printSchema()


root
 |-- index: integer (nullable = true)
 |-- ids: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- falg: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



###### Checking for null values.  We see  column ids has 432913 missing values.

In [14]:
from pyspark.sql.functions import col,isnan,when,count
data2 = data.select([count(when(col(c).contains('None') | \
                            col(c).contains('NULL') | \
                            (col(c) == '' ) | \
                            col(c).isNull() | \
                            isnan(c), c 
                           )).alias(c)
                    for c in data.columns])
data2.show()

2023-11-05 20:23:04,364 WARN csv.CSVHeaderChecker: CSV header does not conform to the schema.
 Header: 0, 1467810369, Mon Apr 06 22:19:45 PDT 2009, NO_QUERY, _TheSpecialOne_, @switchfoot http://twitpic.com/2y1zl - Awww, that's a bummer.  You shoulda got David Carr of Third Day to do it. ;D
 Schema: index, ids, date, falg, user, text
Expected: index but found: 0
CSV file: hdfs://localhost:9000/user1/ProjectTweets.csv

+-----+------+----+----+----+----+
|index|   ids|date|falg|user|text|
+-----+------+----+----+----+----+
|    0|432913|   0|   0|  23| 260|
+-----+------+----+----+----+----+



                                                                                

#### loading the  sentiments dataset and create schema for the table.

In [20]:
customSchema = StructType([
    #StructField("index", IntegerType(), True), 
    StructField("ids", IntegerType(), True), 
    StructField("time", StringType(), True), 
    StructField("score", ByteType(), True), 
    StructField("sentiment", StringType(), True)])
    

tweets_sentiment= spark.read.load('hdfs://localhost:9000//user1/tweet_sent_2.csv', format="csv", header="false", sep=',', schema=customSchema)
tweets_sentiment.printSchema()

#data = spark.read.csv('/user1/ProjectTweets.csv', header=True, inferSchema=True)

root
 |-- ids: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- score: byte (nullable = true)
 |-- sentiment: string (nullable = true)



In [21]:
tweets_sentiment.show()

+----+--------------------+-----+---------+
| ids|                time|score|sentiment|
+----+--------------------+-----+---------+
|null|Fri Jun 19 00:00:...|    1| positive|
|null|Fri Jun 19 00:00:...|   -4| negative|
|null|Fri Jun 19 00:00:...|   -1| negative|
|null|Fri Jun 19 00:00:...|    2| positive|
|null|Fri Jun 19 00:00:...|   -3| negative|
|null|Fri Jun 19 00:00:...|  -15| negative|
|null|Fri Jun 19 00:00:...|   -3| negative|
|null|Fri Jun 19 00:00:...|   -3| negative|
|null|Fri Jun 19 00:00:...|    2| positive|
|null|Fri Jun 19 00:00:...|   -4| negative|
|null|Fri Jun 19 00:00:...|   -2| negative|
|null|Fri Jun 19 00:00:...|   -1| negative|
|null|Fri Jun 19 00:00:...|   -5| negative|
|null|Fri Jun 19 00:00:...|    1| positive|
|null|Fri Jun 19 00:00:...|    4| positive|
|null|Fri Jun 19 00:00:...|    1| positive|
|null|Fri Jun 19 00:00:...|   -3| negative|
|null|Fri Jun 19 00:01:...|    2| positive|
|null|Fri Jun 19 00:01:...|    1| positive|
|null|Fri Jun 19 00:01:...|   11

In [22]:
type(tweets_sentiment)

pyspark.sql.dataframe.DataFrame

In [23]:
# Display the structure of schema
tweets_sentiment.printSchema()

root
 |-- ids: integer (nullable = true)
 |-- time: string (nullable = true)
 |-- score: byte (nullable = true)
 |-- sentiment: string (nullable = true)



#### The time column need to be changed to date time type column and the format of the date and time should be written in certain way to be understood by spark as a timestamp type. To do this, many functions applied first, apply the subsring function to take only the time then add the year as a string using the function lit.  The function concat to add the tow strings together. Then, the function unix_timestamp applied to put the the date and time in certain format and convert them to unix time stamp. Finally, form unix time stamp applying the function from_unixtime to put the date-time back into the required format with the correct type.

In [24]:
tweets_sentiment.select( from_unixtime(unix_timestamp( concat( substring('time', 5, 6), lit(" 2009 "), substring('time',12,8) ) , "MMM dd yyyy HH:mm:ss") ).alias("datetime") ).show()

+-------------------+
|           datetime|
+-------------------+
|2009-06-19 00:00:00|
|2009-06-19 00:00:01|
|2009-06-19 00:00:04|
|2009-06-19 00:00:13|
|2009-06-19 00:00:17|
|2009-06-19 00:00:20|
|2009-06-19 00:00:21|
|2009-06-19 00:00:34|
|2009-06-19 00:00:35|
|2009-06-19 00:00:37|
|2009-06-19 00:00:40|
|2009-06-19 00:00:43|
|2009-06-19 00:00:45|
|2009-06-19 00:00:50|
|2009-06-19 00:00:53|
|2009-06-19 00:00:54|
|2009-06-19 00:00:56|
|2009-06-19 00:01:02|
|2009-06-19 00:01:03|
|2009-06-19 00:01:11|
+-------------------+
only showing top 20 rows



###### save all the new changes to a new dataset named tweets_sent_df contains the ids, datetime, score and sentiment.

In [25]:
tweets_sent_df = tweets_sentiment.select(
    'ids',
    from_unixtime(unix_timestamp( concat( substring('time', 5, 6), lit(" 2009 "), substring('time',12,8) ) , "MMM dd yyyy HH:mm:ss") ).alias("datetime"),
    'score',
    'sentiment'
)

In [26]:
tweets_sent_df.show()

+----+-------------------+-----+---------+
| ids|           datetime|score|sentiment|
+----+-------------------+-----+---------+
|null|2009-06-19 00:00:00|    1| positive|
|null|2009-06-19 00:00:01|   -4| negative|
|null|2009-06-19 00:00:04|   -1| negative|
|null|2009-06-19 00:00:13|    2| positive|
|null|2009-06-19 00:00:17|   -3| negative|
|null|2009-06-19 00:00:20|  -15| negative|
|null|2009-06-19 00:00:21|   -3| negative|
|null|2009-06-19 00:00:34|   -3| negative|
|null|2009-06-19 00:00:35|    2| positive|
|null|2009-06-19 00:00:37|   -4| negative|
|null|2009-06-19 00:00:40|   -2| negative|
|null|2009-06-19 00:00:43|   -1| negative|
|null|2009-06-19 00:00:45|   -5| negative|
|null|2009-06-19 00:00:50|    1| positive|
|null|2009-06-19 00:00:53|    4| positive|
|null|2009-06-19 00:00:54|    1| positive|
|null|2009-06-19 00:00:56|   -3| negative|
|null|2009-06-19 00:01:02|    2| positive|
|null|2009-06-19 00:01:03|    1| positive|
|null|2009-06-19 00:01:11|   11| positive|
+----+-----

In [27]:
tweets_sent_new_df = tweets_sent_df.select('ids', to_timestamp(tweets_sent_df.datetime, 'yyyy-MM-dd HH:mm:ss').alias('datetime_new'), 'score', 'sentiment')

In [28]:
tweets_sent_new_df.show()

+----+-------------------+-----+---------+
| ids|       datetime_new|score|sentiment|
+----+-------------------+-----+---------+
|null|2009-06-19 00:00:00|    1| positive|
|null|2009-06-19 00:00:01|   -4| negative|
|null|2009-06-19 00:00:04|   -1| negative|
|null|2009-06-19 00:00:13|    2| positive|
|null|2009-06-19 00:00:17|   -3| negative|
|null|2009-06-19 00:00:20|  -15| negative|
|null|2009-06-19 00:00:21|   -3| negative|
|null|2009-06-19 00:00:34|   -3| negative|
|null|2009-06-19 00:00:35|    2| positive|
|null|2009-06-19 00:00:37|   -4| negative|
|null|2009-06-19 00:00:40|   -2| negative|
|null|2009-06-19 00:00:43|   -1| negative|
|null|2009-06-19 00:00:45|   -5| negative|
|null|2009-06-19 00:00:50|    1| positive|
|null|2009-06-19 00:00:53|    4| positive|
|null|2009-06-19 00:00:54|    1| positive|
|null|2009-06-19 00:00:56|   -3| negative|
|null|2009-06-19 00:01:02|    2| positive|
|null|2009-06-19 00:01:03|    1| positive|
|null|2009-06-19 00:01:11|   11| positive|
+----+-----

In [29]:
tweets_sent_new_df.printSchema()

root
 |-- ids: integer (nullable = true)
 |-- datetime_new: timestamp (nullable = true)
 |-- score: byte (nullable = true)
 |-- sentiment: string (nullable = true)



In [31]:
type(tweets_sent_new_df)

pyspark.sql.dataframe.DataFrame

#### save the processed dataset to hadoop as a csv file to be able to apply the deep learning part on it. Here we see error because I ran it many times. 

In [32]:
tweets_sent_new_df.write.option("header",True).csv("hdfs://localhost:9000//user1/tweets_sent_new_df.csv")

AnalysisException: path hdfs://localhost:9000/user1/tweets_sent_new_df.csv already exists.

In [33]:
tweets_sent_new_df.toPandas().to_csv('tweets_sent_new_df.csv')

                                                                                

TypeError: Casting to unit-less dtype 'datetime64' is not supported. Pass e.g. 'datetime64[ns]' instead.