# TIME SERIES FORECASTING AND SENTIMENT ANALYSIS OF BIG DATA PROCESSED WITH SparkSQL VS HBASE, CASSANDRA, MONGODB

# Dataset
The dataste is a large dataset gleaned from the twitter API that is called ProjectTweets.csv.

This dataset contains 1,600,000 tweets extracted using the twitter api. 


Content
It contains the following 5 fields:
- ids: The id of the tweet (eg. 4587)
- date: the date of the tweet (eg. Sat May 16 23:58:44 UTC 2009)
- flag: The query (eg. lyx). If there is no query, then this value is NO_QUERY.
- user: the user that tweeted (eg. bobthebuilder)
- text: the text of the tweet (eg. Lyx is cool)

# STOP ALL ACTIVE SPARK SESSIONS

In [1]:
from pyspark.sql import SparkSession
#Initialize SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()
spark.stop()

24/04/30 21:05:52 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StringType, TimestampType
from pyspark.sql.functions import col

# Loading Data from local machine to MongoDB

One can read  CSV file into DataFrame from hadoop

csv_path = "hdfs://localhost:9000/ProjectTweets/ProjectTweets.csv"

df = spark.read.csv(csv_path, header=False, inferSchema=True)

# Step One: Populating the ProjectTweets CSV Into SparkSQL

This involves adding or inserting the ProjectTweets CSV into SparkSQL.

## Initialize Spark Session and Read the ProjectTweets CSV

In [3]:
# Initialize the Spark Session
spark = SparkSession.builder.appName("ProjectTweets CSV to SparkSQL").getOrCreate()

# Set legacy timeParserPolicy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Define the schema for the csv file 
schema = StructType().add("_c0", StringType(), True).add("_c1", StringType(), True).add("_c2", StringType(), True).add("_c3", StringType(), True).add("_c4", StringType(), True).add("_c5", StringType(), True)

# Read the CSV into a DataFrame called df
df = spark.read.format("csv").option("header", "false").schema(schema).load("file:///home/hduser/ProjectTweets.csv")

# Rename the headers
df = df.withColumnRenamed("_c0", "PRIMARY KEY").withColumnRenamed("_c1", "ID").withColumnRenamed("_c2", "date").withColumnRenamed("_c3", "flag").withColumnRenamed("_c4", "user").withColumnRenamed("_c5", "text")

# Convert string date to TimestampType
df = df.withColumn("date", to_timestamp(df["date"], "EEE MMM dd HH:mm:ss zzzz yyyy"))

# Print schema
df.printSchema()

# Show DataFrame
df.show(5)



24/04/30 21:06:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
root
 |-- PRIMARY KEY: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+-----------+----------+-------------------+--------+---------------+--------------------+
|PRIMARY KEY|        ID|               date|    flag|           user|                text|
+-----------+----------+-------------------+--------+---------------+--------------------+
|          0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|          1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|          2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|          3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|          4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
+-----------+----------+-------------------+--------+---------------+--------------------+
only showing top 5 rows



                                                                                

In [4]:
#View if the columns have been renamed
df.printSchema()

root
 |-- PRIMARY KEY: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
#View the Spark DataFrame
df.show(5)

+-----------+----------+-------------------+--------+---------------+--------------------+
|PRIMARY KEY|        ID|               date|    flag|           user|                text|
+-----------+----------+-------------------+--------+---------------+--------------------+
|          0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|          1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|          2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|          3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|          4|1467811193|2009-04-07 06:19:57|NO_QUERY|         Karoli|@nationwideclass ...|
+-----------+----------+-------------------+--------+---------------+--------------------+
only showing top 5 rows



## Write df to MongoDB Database

In [7]:
df.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").option("database", "project").option("collection","ProjectTweets").save()

Py4JJavaError: An error occurred while calling o98.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more


In [10]:

df.printSchema()

df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database","project").option("collection", "ProjectTweets").save()

root
 |-- PRIMARY KEY: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



Py4JJavaError: An error occurred while calling o117.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more


# Step Two: Exploratory Data Analysis (EDA)

## Checking for Duplicates (based on ID, user and text) and Missing data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum  # Import the 'col' and 'sum' functions

# Initialize Spark Session
spark = SparkSession.builder.appName("Duplicate and Missing Data Check").getOrCreate()

# Define the columns to check for duplicates and missing values
columns_to_check = ['PRIMARY KEY','ID', 'user', 'text']

# Create a pipeline to check for duplicates and missing values
pipeline_df = df

# Step 1: Remove duplicate records based on specified columns
pipeline_df = pipeline_df.dropDuplicates(subset=columns_to_check)

# Step 2: Check for missing values
missing_counts = pipeline_df.select([col(c).isNull().cast("int").alias(c) for c in columns_to_check]).agg(*[sum(c).alias(c) for c in columns_to_check]).collect()[0]

# Print the results
print("Number of duplicate records removed:", df.count() - pipeline_df.count())

print("Missing value counts:")
for col_name, missing_count in zip(columns_to_check, missing_counts):
    print(col_name, missing_count)



In [None]:
#Checking for duplicates in ID and user name
# Count the number of rows before removing duplicates
count_before = df.count()

# Remove duplicates
df_no_duplicates = df.dropDuplicates()

# Count the number of rows after removing duplicates
count_after = df_no_duplicates.count()

# Calculate the number of duplicates
num_duplicates = count_before - count_after

print(f"Number of duplicate rows removed: {num_duplicates}")


In [None]:
# Count the number of rows before removing duplicates
count_before = df.count()

# Remove duplicates based on a specific column
df_no_duplicates = df.dropDuplicates(subset=['user'])

# Count the number of rows after removing duplicates
count_after = df_no_duplicates.count()

# Calculate the number of duplicates
num_duplicates = count_before - count_after

print(f"Number of duplicate rows removed based on user: {num_duplicates}")


## Summary Statistics

In [None]:
#Summary Statistics
df.describe().show()

## EXTRACTING TIME COMPONENTS

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second


#Initialize Spark Session
spark = SparkSession.builder.appName("DateTime Visualization").getOrCreate()

# Extract relevant time components including hours, minutes, and seconds
df = df.withColumn("year", year("date")).withColumn("month", month("date")).withColumn("day", dayofmonth("date")).withColumn("hour", hour("date")).withColumn("minute", minute("date")).withColumn("second", second("date"))

# Aggregate data
time_series_data = df.groupBy("date","year", "month", "day", "hour", "minute", "second").count().orderBy("year", "month", "day", "hour", "minute", "second")

In [None]:
#View the df DataFrame after extracting time components
df.show(5)

In [None]:
df.printSchema()

In [None]:
from pyspark.sql.functions import count

# Group by the year variable and count the occurrences
year_counts = df.groupBy("year").agg(count("*").alias("count")).orderBy("year")

# Show the tabulated counts
year_counts.show()


In [None]:
from pyspark.sql.functions import count

# Group by the month variable and count the occurrences
month_counts = df.groupBy("month").agg(count("*").alias("count")).orderBy("month")

# Show the tabulated counts
month_counts.show()

In [None]:
from pyspark.sql.functions import count

# Group by the day variable and count the occurrences
day_counts = df.groupBy("day").agg(count("*").alias("count")).orderBy("day")

# Show the tabulated counts
day_counts.show()

In [None]:
from pyspark.sql.functions import count

# Group by the day variable and count the occurrences
ID_counts = df.groupBy("ID").agg(count("*").alias("count")).orderBy("ID")

# Show the tabulated counts
ID_counts.show()

In [None]:
time_series_data.show(5)

In [None]:
type(time_series_data)

In [None]:
#!pip install nltk
import nltk
nltk.download('vader_lexicon')

In [None]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# Initialize Spark Session
spark = SparkSession.builder.appName("Text Analysis").getOrCreate()

# Tokenize text using a regular expression tokenizer
tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern=r'\W')
df = tokenizer.transform(df)

# Initialize CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features", minDF=1, vocabSize=1000)
model = cv.fit(df)
df = model.transform(df)

# Initialize VADER sentiment analyzer
sid = SentimentIntensityAnalyzer()

# Define a UDF to analyze sentiment using VADER
def analyze_sentiment(text):
    sentiment = sid.polarity_scores(text)
    if sentiment['compound'] >= 0.05:
        return 'Positive'
    elif sentiment['compound'] <= -0.05:
        return 'Negative'
    else:
        return 'Neutral'

sentiment_udf = udf(analyze_sentiment, StringType())

# Apply the UDF to analyze sentiment for each comment
df = df.withColumn("sentiment", sentiment_udf(df["text"]))

# Show the DataFrame
df.select("text", "date", "sentiment", "features").show(5)


In [None]:
df.show(1)

In [None]:
#!pip install statsmodels
!pip install plotly

In [None]:
df.printSchema()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_format
from pyspark.sql.types import StringType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import numpy as np
from statsmodels.tsa.arima.model import ARIMA
import plotly.graph_objects as go
import pandas as pd

# Initialize Spark Session
spark = SparkSession.builder.appName("Sentiment Forecast Dashboard").getOrCreate()

# Step 1: Preprocess the data
sid = SentimentIntensityAnalyzer()
analyze_sentiment_udf = udf(lambda text: sid.polarity_scores(text)['compound'], StringType())
df = df.withColumn("sentiment_score", analyze_sentiment_udf(df["text"]))
df = df.withColumn("date_str", date_format("date", "yyyy-MM-dd"))
df_agg = df.groupBy("date_str").agg({"sentiment_score": "mean"}).withColumnRenamed("avg(sentiment_score)", "sentiment_score")

# Step 2: Train a time series forecasting model
# Convert Spark DataFrame to Pandas DataFrame
df_pd = df_agg.toPandas()
df_pd['date'] = pd.to_datetime(df_pd['date_str'])

# Step 3: Make forecasts
# Assuming ARIMA model for simplicity
model = ARIMA(df_pd['sentiment_score'], order=(1,1,1))
fit_model = model.fit()
forecast_1_day = fit_model.forecast(steps=1)
forecast_3_days = fit_model.forecast(steps=3)
forecast_7_days = fit_model.forecast(steps=7)

# Step 4: Display the forecasts (using Plotly for visualization)

# Plot historical sentiment data
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_pd['date'], y=df_pd['sentiment_score'], mode='lines', name='Historical Sentiment'))

# Plot forecasted values
forecast_dates = [df_pd['date'].iloc[-1] + pd.Timedelta(days=i) for i in range(1, 8)]
fig.add_trace(go.Scatter(x=forecast_dates, y=forecast_1_day, mode='lines', name='Forecast (1 day)'))
fig.add_trace(go.Scatter(x=forecast_dates[:3], y=forecast_3_days, mode='lines', name='Forecast (3 days)'))
fig.add_trace(go.Scatter(x=forecast_dates[:7], y=forecast_7_days, mode='lines', name='Forecast (7 days)'))

# Update layout
fig.update_layout(title='Sentiment Forecast Dashboard',
                  xaxis_title='Date',
                  yaxis_title='Sentiment Score')

# Show the dashboard
fig.show()


In [None]:
# Plot historical sentiment data
fig_historical = go.Figure()
fig_historical.add_trace(go.Scatter(x=df_pd['date'], y=df_pd['sentiment_score'], mode='lines', name='Historical Sentiment'))
fig_historical.update_layout(title='Historical Sentiment',
                             xaxis_title='Date',
                             yaxis_title='Sentiment Score')

# Plot forecasted values for 1 day
fig_1_day = go.Figure()
fig_1_day.add_trace(go.Scatter(x=forecast_dates, y=forecast_1_day, mode='lines', name='Forecast (1 day)'))
fig_1_day.update_layout(title='Sentiment Forecast (1 day)',
                        xaxis_title='Date',
                        yaxis_title='Sentiment Score')

# Plot forecasted values for 3 days
fig_3_days = go.Figure()
fig_3_days.add_trace(go.Scatter(x=forecast_dates[:3], y=forecast_3_days, mode='lines', name='Forecast (3 days)'))
fig_3_days.update_layout(title='Sentiment Forecast (3 days)',
                         xaxis_title='Date',
                         yaxis_title='Sentiment Score')

# Plot forecasted values for 7 days
fig_7_days = go.Figure()
fig_7_days.add_trace(go.Scatter(x=forecast_dates[:7], y=forecast_7_days, mode='lines', name='Forecast (7 days)'))
fig_7_days.update_layout(title='Sentiment Forecast (7 days)',
                         xaxis_title='Date',
                         yaxis_title='Sentiment Score')

# Show the separate figures
fig_historical.show()
fig_1_day.show()
fig_3_days.show()
fig_7_days.show()


In [None]:
import plotly.graph_objects as go
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, date_format
from pyspark.sql.types import StringType
import numpy as np
from statsmodels.tsa.arima.model import ARIMA

# Initialize Spark Session
spark = SparkSession.builder.appName("Sentiment Forecast Dashboard").getOrCreate()

# Assuming df is your Spark DataFrame containing the sentiment data

# Step 1: Preprocess the data
sid = SentimentIntensityAnalyzer()
analyze_sentiment_udf = udf(lambda text: sid.polarity_scores(text)['compound'], StringType())
df = df.withColumn("sentiment_score", analyze_sentiment_udf(df["text"]))
df = df.withColumn("month", date_format("date", "MM"))
df = df.withColumn("day", date_format("date", "dd"))
df_agg = df.groupBy("month", "day").agg({"sentiment_score": "mean"}).withColumnRenamed("avg(sentiment_score)", "sentiment_score")

# Step 2: Train a time series forecasting model
# Convert Spark DataFrame to Pandas DataFrame
df_pd = df_agg.toPandas()
df_pd['date'] = pd.to_datetime(df_pd['month'] + '-' + df_pd['day'])

# Step 3: Make forecasts
# Assuming ARIMA model for simplicity
model = ARIMA(df_pd['sentiment_score'], order=(1,1,1))
fit_model = model.fit()
forecast_1_day = fit_model.forecast(steps=1)
forecast_3_days = fit_model.forecast(steps=3)
forecast_7_days = fit_model.forecast(steps=7)

# Step 4: Display the forecasts (using Plotly for visualization)

# Create a dropdown menu for selecting months
month_buttons = [
    {'label': f'Month {month}', 'method': 'update', 'args': [{'visible': [df_pd['month'] == month]}]}
    for month in df_pd['month'].unique()
]

# Plot historical sentiment data
fig = go.Figure()
fig.add_trace(go.Scatter(x=df_pd['day'], y=df_pd['sentiment_score'], mode='lines', name='Historical Sentiment'))

# Plot forecasted values
forecast_dates = [df_pd['date'].iloc[-1] + pd.Timedelta(days=i) for i in range(1, 8)]
fig.add_trace(go.Scatter(x=forecast_dates, y=forecast_1_day, mode='lines', name='Forecast (1 day)'))
fig.add_trace(go.Scatter(x=forecast_dates[:3], y=forecast_3_days, mode='lines', name='Forecast (3 days)'))
fig.add_trace(go.Scatter(x=forecast_dates[:7], y=forecast_7_days, mode='lines', name='Forecast (7 days)'))

# Update layout
fig.update_layout(title='Sentiment Forecast Dashboard',
                  xaxis_title='Day',
                  yaxis_title='Sentiment Score',
                  updatemenus=[{'buttons': month_buttons, 'direction': 'down', 'showactive': True, 'x': 0.5, 'xanchor': 'center'}],
                  showlegend=True)

# Show the dashboard
fig.show()

# Populated the Project Tweets CSV  into the NoSQL DATABASE (MONGODB) USING SPARK

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Process data from MongoDB using spark").config("spark.mongodb.input.uri", "mongodb://localhost:27017/project.ProjectTweets").config("spark.mongodb.output.uri", "mongodb://localhost:27017/Project.my_output_ProjectTweets").getOrCreate()

# Read data from MongoDB into a DataFrame
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

In [None]:
# Perform Spark transformations and actions on the DataFrame
# For example:
df_processed = df.select("field1", "field2").filter(df["field1"] > 10)

# Write processed data back to MongoDB
df_processed.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

# Stop Spark session
spark.stop()

In [None]:
# Stop Spark session
spark.stop()

In [11]:
# Import necessary libraries 
from pyspark.sql import SparkSession

# State the input and output directories of your Database 
input_uri = "mongodb://localhost:27017/project.ProjectTweets"
output_uri = "mongodb://localhost:27017/project.ProjectTweets"

# Make a spark session to use pysaprk and also configure spark to be configured with MongoDB as follows
spark = SparkSession.builder \
    .appName("myProject") \
    .config("spark.mongodb.input.uri", input_uri) \
    .config("spark.mongodb.output.uri", output_uri) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2") \
    .getOrCreate()

24/04/30 11:08:47 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [12]:
# Initialize the Spark Session
spark = SparkSession.builder.appName("ProjectTweets CSV to SparkSQL").getOrCreate()

# Set legacy timeParserPolicy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Define the schema for the csv file 
schema = StructType().add("_c0", StringType(), True).add("_c1", StringType(), True).add("_c2", StringType(), True).add("_c3", StringType(), True).add("_c4", StringType(), True).add("_c5", StringType(), True)

# Read the CSV into a DataFrame called df
df = spark.read.format("csv").option("header", "false").schema(schema).load("file:///home/hduser/ProjectTweets.csv")

# Rename the headers
df = df.withColumnRenamed("_c0", "PRIMARY KEY").withColumnRenamed("_c1", "ID").withColumnRenamed("_c2", "date").withColumnRenamed("_c3", "flag").withColumnRenamed("_c4", "user").withColumnRenamed("_c5", "text")

# Convert string date to TimestampType
df = df.withColumn("date", to_timestamp(df["date"], "EEE MMM dd HH:mm:ss zzzz yyyy"))

# Print schema
df.printSchema()

# Show DataFrame
df.show(5)



24/04/30 11:08:50 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
root
 |-- PRIMARY KEY: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)

+-----------+----------+-------------------+--------+---------------+--------------------+
|PRIMARY KEY|        ID|               date|    flag|           user|                text|
+-----------+----------+-------------------+--------+---------------+--------------------+
|          0|1467810369|2009-04-07 06:19:45|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|          1|1467810672|2009-04-07 06:19:49|NO_QUERY|  scotthamilton|is upset that he ...|
|          2|1467810917|2009-04-07 06:19:53|NO_QUERY|       mattycus|@Kenichan I dived...|
|          3|1467811184|2009-04-07 06:19:57|NO_QUERY|        ElleCTF|my whole body fee...|
|          4|

In [15]:
!pip install pymongo

Defaulting to user installation because normal site-packages is not writeable


In [16]:
from pyspark.sql import SparkSession

In [17]:
spark = SparkSession.builder.appName("Local to MongoDB").getOrCreate()


24/04/30 11:15:42 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [18]:
# Set legacy timeParserPolicy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Define the schema for the csv file 
schema = StructType().add("_c0", StringType(), True).add("_c1", StringType(), True).add("_c2", StringType(), True).add("_c3", StringType(), True).add("_c4", StringType(), True).add("_c5", StringType(), True)

# Read the CSV into a DataFrame called df
df = spark.read.format("csv").option("header", "false").schema(schema).load("file:///home/hduser/ProjectTweets.csv")

# Rename the headers
df = df.withColumnRenamed("_c0", "PRIMARY KEY").withColumnRenamed("_c1", "ID").withColumnRenamed("_c2", "date").withColumnRenamed("_c3", "flag").withColumnRenamed("_c4", "user").withColumnRenamed("_c5", "text")

# Convert string date to TimestampType
df = df.withColumn("date", to_timestamp(df["date"], "EEE MMM dd HH:mm:ss zzzz yyyy"))

In [21]:
# Set the MongoDB Spark Connector JAR file path
spark_jars = "/home/hduser/Downloads/mongo-spark-connector_2.13-10.2.3.jar"

# Initialize SparkSession with MongoDB Spark Connector JAR
spark = SparkSession.builder.appName("spark-MongoDB").config("spark.jars", spark_jars)    .getOrCreate()


24/04/30 11:39:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [23]:
df.write.format("pkg:maven/org.mongodb.spark/mongo-spark-connector_2.13@10.2.3") \
    .mode("append") \
    .option("uri", "mongodb://localhost:27017/project.ProjectTweets") \
    .save()


Py4JJavaError: An error occurred while calling o200.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: pkg:maven/org.mongodb.spark/mongo-spark-connector_2.13@10.2.3. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: pkg:maven/org.mongodb.spark/mongo-spark-connector_2.13@10.2.3.DefaultSource
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.\
    builder.appName("MongoDBIntegration").\
    getOrCreate()

    # Read it back from MongoDB into a new Dataframe
    readUsers = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://localhost:27017/project.ProjectTweets")\
    .load()

    readUsers.createOrReplaceTempView("users")

    readUsers.printSchema()

    sqlDF = spark.sql("""
    SELECT occupation,count(user_id) as cnt_usr
    FROM users 
    GROUP BY occupation
    ORDER BY cnt_usr DESC
    """)

    sqlDF.show()


In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F  # Import functions module explicitly

def parseInput(line):
    fields = line.split(',')
    return Row(user_id=int(fields[0]), id2=int(fields[1]), date=fields[2], flag=fields[3], user=fields[4], text=fields[5])

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MongoDBIntegration").getOrCreate()

    # Read CSV file into a DataFrame
    lines = spark.read.csv("hdfs://localhost:9000/ProjectTweets/ProjectTweets.csv")

    # Creating new DataFrame by applying the parser function
    users = lines.rdd.map(parseInput)

    # Convert RDD into a DataFrame
    usersDataset = spark.createDataFrame(users)

    # Write the data into MongoDB
    usersDataset.write \
        .format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", "mongodb://localhost:27017/project.ProjectTweets") \
        .mode("overwrite") \
        .save()

                                                                                

24/04/30 12:07:51 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1897, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrap

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7) (10.0.2.15 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1897, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_6709/2532075986.py", line 6, in parseInput
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1902, in __getattr__
    raise AttributeError(item)
AttributeError: split

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1897, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'split' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/pyspark/rdd.py", line 1877, in takeUpToNumLeft
    yield next(iterator)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_6709/2532075986.py", line 6, in parseInput
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1902, in __getattr__
    raise AttributeError(item)
AttributeError: split

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:166)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions

if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.\
    builder.appName("MongoDBIntegration").\
    getOrCreate()

    # Read it back from MongoDB into a new Dataframe
    readUsers = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://127.0.0.1/moviesdata.users")\
    .load()

    readUsers.createOrReplaceTempView("users")

    readUsers.printSchema()

    sqlDF = spark.sql("""
    SELECT occupation,count(user_id) as cnt_usr
    FROM users 
    GROUP BY occupation
    ORDER BY cnt_usr DESC
    """)

    sqlDF.show()