In [1]:
# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import DateType
from pymongo import MongoClient
import re
import os
import sys  # Add this import

spark_home, java_home = "C:/Spark", "C:/Java" # Manually set Java and Spark home directory

# Set environment variables
os.environ['SPARK_HOME'] = spark_home
os.environ['JAVA_HOME'] = java_home
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-memory 4g --executor-memory 4g pyspark-shell'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "notebook"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = sys.executable

os.environ['PATH'] = os.path.join(spark_home, 'bin') + os.pathsep + os.environ['PATH'] # Add the bin directory to the PATH

# Create a SparkSession
spark = SparkSession.builder \
    .appName('practice') \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0") \
    .config("spark.driver.host", "localhost").getOrCreate()

# Specify MongoDB connection options
mongo_options = {
    "spark.mongodb.input.uri": "mongodb://localhost:27017/Big_Tweet.Tweets_Data",}

# Read data from MongoDB into a DataFrame
df = spark.read.format("mongo").options(**mongo_options).load()
df.show() # Show the DataFrame

+--------------------+--------------------+--------+----------+--------------------+---------------+
|                 _id|                date|    flag|       ids|                text|           user|
+--------------------+--------------------+--------+----------+--------------------+---------------+
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059493951|Kinda scared to s...|          l7l7v|
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059494141|RB TY@threebears:...|    DBSLKitties|
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059494328|Trying to fix my ...|      AndrLMaxf|
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059494463|@ether_radio no m...|   mumble_rosie|
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059494807|i should be in lo...|         caavis|
|{65891bb73fbab16a...|Sat Jun 06 17:02:...|NO_QUERY|2059495145|Is having a weeke...|        lena247|
|{65891bb73fbab16a...|Sat Jun 06 17:03:...|NO_QUERY|2059496112|Going to watch Te...|     ha

In [5]:
# to display the schema
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- ids: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user: string (nullable = true)



In [6]:
# to display the the distinct values
distinct_flag = df.select("flag").distinct()
distinct_flag.show()

+--------+
|    flag|
+--------+
|NO_QUERY|
+--------+



In [7]:
# to select the specific columns
selected_data = df.select("date", "user", "text")
selected_data.show(5)

+--------------------+------------+--------------------+
|                date|        user|                text|
+--------------------+------------+--------------------+
|Sat Jun 06 17:02:...|       l7l7v|Kinda scared to s...|
|Sat Jun 06 17:02:...| DBSLKitties|RB TY@threebears:...|
|Sat Jun 06 17:02:...|   AndrLMaxf|Trying to fix my ...|
|Sat Jun 06 17:02:...|mumble_rosie|@ether_radio no m...|
|Sat Jun 06 17:02:...|      caavis|i should be in lo...|
+--------------------+------------+--------------------+
only showing top 5 rows



In [8]:
# to count the number of rows in the DataFrame
row_count = selected_data.count()
print("Number of rows:", row_count)

Number of rows: 1600000


### **Dealing with duplicate and null values**

In [9]:
# to count the number of rows in the DataFrame
old_count = selected_data.count()

# Drop the null values from the cleaned data
new_data = selected_data.dropDuplicates()

# to count the number of rows in the DataFrame after removing the duplicates
new_count = new_data.count()

print("Number of rows before removing duplicate :--> ", old_count)
print("Number of rows before after duplicate :-->", new_count)

Number of rows before removing duplicate :-->  1600000
Number of rows before after duplicate :--> 1598127


In [10]:
# Check for null values in a specific column (e.g., "columnName")
column_name = "text"
null_count_in_column = new_data.filter(col(column_name).isNull()).count()

# Show the count of null values in the specific column
print(f"Number of null values in {column_name}: {null_count_in_column}")

Number of null values in text: 0


In [11]:
from pyspark.sql import functions as F

grouped_df = new_data.groupBy("user").agg(F.count("text").alias("tweet_count"))
grouped_df.show()

+--------------+-----------+
|          user|tweet_count|
+--------------+-----------+
|       Daniiej|          3|
|           kdc|          9|
|  jeffreeefans|         39|
|     ASU_ConEd|          1|
|    janiek1981|          2|
|     taylajade|         15|
|   SuperDunner|         14|
|pantherapardus|         18|
|    Peedletuck|          2|
|     elimorris|          1|
|    Calamorama|          1|
|     AlexB1001|          1|
|    macuser612|          2|
|      tippi_jo|          2|
|  ifyuhseekamy|          1|
|   xsweetlukex|          2|
|      chukaman|          7|
|   beesarahlee|          1|
|  reneewandell|          1|
|  KatjaPresnal|          7|
+--------------+-----------+
only showing top 20 rows



## **Dealing with date**

In [12]:
new_data.show(5)

+--------------------+-------------+--------------------+
|                date|         user|                text|
+--------------------+-------------+--------------------+
|Sat Jun 06 17:11:...|zorythevirgin|fml sending alott...|
|Sat Jun 06 17:14:...|       KLoeff|I found yet anoth...|
|Fri May 22 06:43:...|   MisterJLee|Morning sweethear...|
|Fri May 22 06:43:...|      yelshia|Going camping! By...|
|Sat May 30 09:48:...|      franzne|my finger is blee...|
+--------------------+-------------+--------------------+
only showing top 5 rows



In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, from_unixtime

# Create a Spark session
spark = SparkSession.builder.appName("DateConversion").getOrCreate()

# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Convert the string to a Unix timestamp
selected_data = new_data.withColumn("unix_timestamp", unix_timestamp("date", "EEE MMM dd HH:mm:ss zzz yyyy"))

# Convert Unix timestamp to the correct timestamp format
selected_data = selected_data.withColumn("formatted_date", from_unixtime("unix_timestamp", "yyyy-MM-dd HH:mm:ss"))

# Drop the original "date" column
selected_data = selected_data.drop("date")

# Rename the "formatted_date" column to "date"
selected_data = selected_data.withColumnRenamed("formatted_date", "date")

# Reorder the columns to have "date" at the first index
selected_data = selected_data.select("date", *selected_data.columns)


In [14]:
# Select the first three columns and create a new DataFrame
cleaned_data = selected_data.select("date", "user", "text")
cleaned_data.show(5)

+-------------------+-------------+--------------------+
|               date|         user|                text|
+-------------------+-------------+--------------------+
|2009-06-07 05:41:00|zorythevirgin|fml sending alott...|
|2009-06-07 05:44:18|       KLoeff|I found yet anoth...|
|2009-05-22 19:13:20|   MisterJLee|Morning sweethear...|
|2009-05-22 19:13:20|      yelshia|Going camping! By...|
|2009-05-30 22:18:16|      franzne|my finger is blee...|
+-------------------+-------------+--------------------+
only showing top 5 rows



In [15]:
# Sort the DataFrame based on the "date" column
sorted_data = cleaned_data.orderBy("date")
sorted_data.show(5)

+-------------------+---------------+--------------------+
|               date|           user|                text|
+-------------------+---------------+--------------------+
|2009-04-07 10:49:45|_TheSpecialOne_|@switchfoot http:...|
|2009-04-07 10:49:49|  scotthamilton|is upset that he ...|
|2009-04-07 10:49:53|       mattycus|@Kenichan I dived...|
|2009-04-07 10:49:57|         Karoli|@nationwideclass ...|
|2009-04-07 10:49:57|        ElleCTF|my whole body fee...|
+-------------------+---------------+--------------------+
only showing top 5 rows



In [48]:
# pip install nltk
# pip install contractions

In [17]:
import re
from pyspark.sql.functions import col, udf, regexp_replace, lower
from pyspark.sql.types import StringType
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
import contractions
from nltk.stem import WordNetLemmatizer

In [19]:
# Define the cleaning function
def clean_text(text):
    # Remove numbers
    cleaned_text = regexp_replace(text, r'\d+', '')
    
    # Remove mentions
    cleaned_text = regexp_replace(cleaned_text, r'@[A-Za-z0-9_]+', '')
    
    # Remove email addresses
    cleaned_text = regexp_replace(cleaned_text, r'([a-zA-Z0-9+._-]+@[a-zA-Z0-9+._-]+\.[a-zA-Z0-9+._-]+)', '')
    
    # Remove hyperlinks
    cleaned_text = regexp_replace(cleaned_text, r'https?://.*[\r\n]*', '')
    cleaned_text = regexp_replace(cleaned_text, r'http?://.*[\r\n]*', '')
    
    # Remove hashtags
    cleaned_text = regexp_replace(cleaned_text, r'#[A-Za-z0-9_]+', '')
    
    # Remove brackets
    cleaned_text = regexp_replace(cleaned_text, r" ?\([^)]+\)", "")
    
    # Remove HTML tags
    cleaned_text = regexp_replace(cleaned_text, r'[<.*?>]+', '')
    
    # Remove specific words
    words_to_remove = [
        "ain't", "aren't", "can't", "can't've", "'cause", "couldn't", "could've",
        "didn't", "doesn't", "don't", "hadn't", "hadn't've", "hasn't", "haven't",
        "he'd", "he'd've", "he'll", "he'll've", "he's", "how'd", "how'd'y", "how'll",
        "how's", "i'd", "i'd've", "i'll", "i'll've", "i'm", "i've", "isn't", "it'd",
        "it'd've", "it'll", "it'll've", "it's", "let's", "ma'am", "mayn't", "might've",
        "mightn't", "mightn't've", "mustn't", "mustn't've", "needn't", "needn't've",
        "o'clock", "oughtn't", "oughtn't've", "shan't", "sha'n't", "shan't've", "she'd",
        "she'd've", "she'll", "she'll've", "she's", "should've", "shouldn't", "shouldn't've",
        "so've", "so's", "that'd", "that'd've", "that's", "there'd", "there'd've", "there's",
        "they'd", "they'd've", "they'll", "they'll've", "they're", "they've", "to've", "wasn't",
        "we'd", "we'd've", "we'll", "we'll've", "we're", "we've", "weren't", "what'll"
    ]
    
    for word in words_to_remove:
        cleaned_text = regexp_replace(cleaned_text, r'\b{}\b'.format(word), '')
    
    # Remove special characters
    cleaned_text = regexp_replace(cleaned_text, r'([_|!|%|^|&|*^|\|~|=|$\|/|.,!?/:;\"\'“\”\’]+)', '')
    
    # Remove double quotes
    cleaned_text = regexp_replace(cleaned_text, r'[""]', '')
    
    # Remove dots
    cleaned_text = regexp_replace(cleaned_text, r'[.|.^]+', '')
    
    # Convert to lowercase
    cleaned_text = lower(cleaned_text)
    
    return cleaned_text

# Apply the cleaning function to the text column
cleaned_data = sorted_data.withColumn('cleaned_text', clean_text(col('text')))

# Show the resulting DataFrame
cleaned_data.show()

+-------------------+---------------+--------------------+--------------------+
|               date|           user|                text|        cleaned_text|
+-------------------+---------------+--------------------+--------------------+
|2009-04-07 10:49:45|_TheSpecialOne_|@switchfoot http:...|                    |
|2009-04-07 10:49:49|  scotthamilton|is upset that he ...|is upset that he ...|
|2009-04-07 10:49:53|       mattycus|@Kenichan I dived...| i dived many tim...|
|2009-04-07 10:49:57|         Karoli|@nationwideclass ...| no  not behaving...|
|2009-04-07 10:49:57|        ElleCTF|my whole body fee...|my whole body fee...|
|2009-04-07 10:50:00|       joy_wolf|@Kwesidei not the...| not the whole crew |
|2009-04-07 10:50:03|           coZZ|@LOLTrish hey  lo...| hey  long time n...|
|2009-04-07 10:50:03|        mybirch|         Need a hug |         need a hug |
|2009-04-07 10:50:05|2Hood4Hollywood|@Tatiana_K nope t...| nope they  have it |
|2009-04-07 10:50:09|        mimismo|@tw

In [None]:
from textblob import TextBlob
import re
import spacy

# Install spaCy model
!python -m spacy download en_core_web_sm

# Load spaCy English model
nlp = spacy.load("en_core_web_sm")

def clean_text_with_spacy(text):
    # Lowercasing, removing mentions, hyperlinks, and special characters
    cleaned_text = ' '.join(TextBlob(text).words.lower())
    
    # Tokenize words
    words = cleaned_text.split()

    # Remove stop words
    stop_words = set(nlp.Defaults.stop_words)
    words = [word for word in words if word.lower() not in stop_words]

    # Apply stemming
    words = [nlp(word)[0].lemma_ for word in words]

    # Join the cleaned and processed words
    cleaned_text = ' '.join(words)

    return cleaned_text

# Apply the cleaning function to the text column
cleaned_data = cleaned_data.withColumn('cleaned_text', clean_text_with_spacy(col('cleaned_text')))

In [None]:
# Show the resulting DataFrame
cleaned_data.show(truncate=False)

In [20]:
# Select the first three columns and create a new DataFrame
cleaned_data = cleaned_data.select("date", "user", "cleaned_text")
cleaned_data.show(5)

+-------------------+---------------+--------------------+
|               date|           user|        cleaned_text|
+-------------------+---------------+--------------------+
|2009-04-07 10:49:45|_TheSpecialOne_|                    |
|2009-04-07 10:49:49|  scotthamilton|is upset that he ...|
|2009-04-07 10:49:53|       mattycus| i dived many tim...|
|2009-04-07 10:49:57|        ElleCTF|my whole body fee...|
|2009-04-07 10:49:57|         Karoli| no  not behaving...|
+-------------------+---------------+--------------------+
only showing top 5 rows



In [55]:
# Check for null values in a specific column (e.g., "columnName")
column_name = "cleaned_text"
null_count_in_column = cleaned_data.filter(col(column_name).isNull()).count()

# Show the count of null values in the specific column
print(f"Number of null values in {column_name}: {null_count_in_column}")

Number of null values in cleaned_text: 0


### **Saving Cleaned Data into MongoDB**

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MySQLWriteExample").getOrCreate()

# Specify the MongoDB connection details
mongo_uri = "mongodb://localhost:27017/"
database_name = "Big_Tweet"
collection_name = "Cleaned_Data"

# Write the DataFrame to MongoDB
cleaned_data.write.format("mongo").mode("overwrite").option("uri", mongo_uri + database_name + "." + collection_name).save()

# Stop the Spark session
spark.stop()

### **Saving Cleaned Data into Mysql**

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("MySQLWriteExample").getOrCreate()

# Assuming 'df' is your DataFrame that you want to write to MySQL

# JDBC connection properties for MySQL
mysql_properties = {
    "driver": "com.mysql.cj.jdbc.Driver",
    "url": "jdbc:mysql://localhost:3306/Big_Data",
    "user": "root",
    "password": "root1234",
    "dbtable": "cleaned_data",
}

# Write DataFrame to MySQL
cleaned_data.write.format("jdbc") \
    .option("url", mysql_properties["url"]) \
    .option("driver", mysql_properties["driver"]) \
    .option("dbtable", mysql_properties["dbtable"]) \
    .option("user", mysql_properties["user"]) \
    .option("password", mysql_properties["password"]) \
    .mode("overwrite")  # Change to "append" if needed

# Stop the Spark session
spark.stop()


### **Saving Cleaned Data into CSV_File**

In [None]:
# Specify the path where you want to save the CSV file
csv_path = "Cleaned_Data.csv"

# Write the DataFrame to CSV
#cleaned_data.write.csv(csv_path, header=True, mode="overwrite")
cleaned_data.coalesce(1).write.csv(csv_path, header=True, mode="overwrite")

In [None]:
#  for the cleaning the text Define the cleaning function
#def clean_text(text):
#
#    #  for the Remove numbers
#    cleaned_text = regexp_replace(text, r'\d+', '')
#    
#    # for the Remove mentions
#    cleaned_text = regexp_replace(cleaned_text, r'@[A-Za-z0-9_]+', '')
#    
#    #  for the Remove email addresses
#    cleaned_text = regexp_replace(cleaned_text, r'([a-zA-Z0-9+._-]+@[a-zA-Z0-9+._-]+\.[a-zA-Z0-9+._-]+)', '')
#    
#    # for the Remove hyperlinks
#    cleaned_text = regexp_replace(cleaned_text, r'https?:\/\/.*[\r\n]*', '')
#    cleaned_text = regexp_replace(cleaned_text, r'http?:\/\/.*[\r\n]*', '')
#    
#    #  for the Remove hashtags
#    cleaned_text = regexp_replace(cleaned_text, r'#[A-Za-z0-9_]+', '')
#    
#    # Remove brackets
#    cleaned_text = regexp_replace(cleaned_text, r" ?\([^)]+\)", "")
#    
#    #  for the Remove HTML tags
#    cleaned_text = regexp_replace(cleaned_text, r'[<.*?>]+', '')
#    
#    # Remove specific words
#    words_to_remove = [
#        "ain't", "aren't", "can't", "can't've", "'cause", "couldn't", "could've",
#        "didn't", "doesn't", "don't", "hadn't", "hadn't've", "hasn't", "haven't",
#        "he'd", "he'd've", "he'll", "he'll've", "he's", "how'd", "how'd'y", "how'll",
#        "how's", "i'd", "i'd've", "i'll", "i'll've", "i'm", "i've", "isn't", "it'd",
#        "it'd've", "it'll", "it'll've", "it's", "let's", "ma'am", "mayn't", "might've",
#        "mightn't", "mightn't've", "mustn't", "mustn't've", "needn't", "needn't've",
#        "o'clock", "oughtn't", "oughtn't've", "shan't", "sha'n't", "shan't've", "she'd",
#        "she'd've", "she'll", "she'll've", "she's", "should've", "shouldn't", "shouldn't've",
#        "so've", "so's", "that'd", "that'd've", "that's", "there'd", "there'd've", "there's",
#        "they'd", "they'd've", "they'll", "they'll've", "they're", "they've", "to've", "wasn't",
#        "we'd", "we'd've", "we'll", "we'll've", "we're", "we've", "weren't", "what'll"
#    ]
#    
#    for word in words_to_remove:
#        cleaned_text = regexp_replace(cleaned_text, r'\b{}\b'.format(word), '')
#    #  for the Remove special characters
#    cleaned_text = regexp_replace(cleaned_text, r'([_|!|%|^|&|*^|\|~|=|$\|/|.,!?/:;\"\'\“\”\’]+)', '')
#    
#    #  for the Remove double quotes
#    cleaned_text = regexp_replace(cleaned_text, r'[""]', '')
#    
#    #  for the Remove dots
#    cleaned_text = regexp_replace(cleaned_text, r'[.|.^]+', '')
#    
#    return cleaned_text
#
## now we Apply the cleaning function to the text column
#df = sorted_data.withColumn('cleaned_text', clean_text(col('text')))
#
##  resulting DataFrame
#df.show()