In [1]:
import pyspark
import pandas as pd

In [2]:
import sys
sys.version

'3.10.0 (tags/v3.10.0:b494f59, Oct  4 2021, 19:00:18) [MSC v.1929 64 bit (AMD64)]'

In [3]:
# Spark Session created
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.getOrCreate()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("topic_modeling_all_tweets") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


In [4]:
sqlite_db_path = 'C:\\Users\\nurja\\csc7740\\sqlite_db'


if sqlite_db_path not in sys.path:
    sys.path.append(sqlite_db_path)
    print(f"Added {sqlite_db_path} to sys.path")
    
from sqlite_helpers import Sqlite_db

Added C:\Users\nurja\csc7740\sqlite_db to sys.path


### Import all data, not only covid

In [5]:
import os

def get_all_tweets_from_db(database_name, table_name):
    data_dir = 'C:/Users/nurja/csc7740/COVID19MisinformationPaper/themes_of_misinfo_project_tweets/'
    database_path = os.path.join(data_dir, 'sqlite_db')
    
    my_db = Sqlite_db(os.path.join(database_path, database_name))
    
    # Change the SQL query to select all tweets
    df = my_db.query(f'SELECT * FROM {table_name}') 
    my_db.close()

    return df


In [6]:
all_df = get_all_tweets_from_db('tweets_db_2023-03-12_17-57-49.db','raw_tweets') 
all_df.shape

Loading: C:/Users/nurja/csc7740/COVID19MisinformationPaper/themes_of_misinfo_project_tweets/sqlite_db\tweets_db_2023-03-12_17-57-49.db


(2769978, 16)

In [7]:
#tweets_db_converted= all_df.to_csv("all_tweets.csv", index=False)

In [8]:
df = pd.read_csv("all_tweets.csv")
df.head(1)

Unnamed: 0,TweetID,UserID,Username,Timestamp,Text,State,County,City,Sentiment,COVID-related,AgeGroup,Age_Confidence,Gender,Gender_Confidence,Org_Confidence,Retweet
0,1240790159890747393,2917257384,dawnnodemi,Fri Mar 20 00:00:01 +0000 2020,This is giving me the world coming to an end t...,Louisiana,,,-0.1154,0,19-29,0.7815,female,0.9628,0.0006,1


In [9]:
df.shape

(2769978, 16)

In [10]:
state_counts = df['State'].value_counts()

# Display the result
print(state_counts)

State
Washington    1567973
Louisiana     1202005
Name: count, dtype: int64


In [11]:
# Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Show the Spark DataFrame
spark_df.show(1)

+-------------------+----------+----------+--------------------+--------------------+---------+------+----+---------+-------------+--------+--------------+------+-----------------+--------------+-------+
|            TweetID|    UserID|  Username|           Timestamp|                Text|    State|County|City|Sentiment|COVID-related|AgeGroup|Age_Confidence|Gender|Gender_Confidence|Org_Confidence|Retweet|
+-------------------+----------+----------+--------------------+--------------------+---------+------+----+---------+-------------+--------+--------------+------+-----------------+--------------+-------+
|1240790159890747393|2917257384|dawnnodemi|Fri Mar 20 00:00:...|This is giving me...|Louisiana|   NaN| NaN|  -0.1154|            0|   19-29|        0.7815|female|           0.9628|        6.0E-4|      1|
+-------------------+----------+----------+--------------------+--------------------+---------+------+----+---------+-------------+--------+--------------+------+-----------------+----

In [12]:
# Get the number of rows
num_rows = spark_df.count()
print(f"Number of rows: {num_rows}")
# Get the number of columns
num_columns = len(spark_df.columns)
print(f"Number of columns: {num_columns}")

Number of rows: 2769978
Number of columns: 16


In [13]:
## Data Preprocessing 

from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Tokenize the text
tokenizer = Tokenizer(inputCol="Text", outputCol="words")
tweets_words = tokenizer.transform(spark_df)

# Remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
tweets_filtered = remover.transform(tweets_words)

# Show the preprocessed DataFrame
tweets_filtered.select("Text", "filtered_words").show(truncate=False)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Text                                                                                                                                                                                                                                                                                 |filtered_words                                                                                                                                                                                                            

In [14]:
## Data Preprocessing 

from pyspark.ml.feature import CountVectorizer

# Create a CountVectorizer model
cv = CountVectorizer(inputCol="filtered_words", outputCol="features")

# Fit the model to the DataFrame and transform it
cv_model = cv.fit(tweets_filtered)
tweets_dtm = cv_model.transform(tweets_filtered)

# Show the DTM
tweets_dtm.select("Text", "features").show(truncate=False)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Text                                                                                                                                                                                                                                                                                 |features                                                                                                                                                                                                                 |
+---

In [15]:
from pyspark.ml.clustering import LDA

# Train the LDA model
lda = LDA(k=5, maxIter=10)  # k is the number of topics
lda_model = lda.fit(tweets_dtm)

# Describe topics
topics = lda_model.describeTopics()
vocab = cv_model.vocabulary
for topic in topics.collect():
    print("Topic {}: {}".format(topic[0], [vocab[idx] for idx in topic[1]]))


Topic 0: ['', 'one', 'people', 'get', 'trump', '&amp;', 'still', 'right', 'person', 'black']
Topic 1: ['', 'like', 'people', 'get', '&amp;', 'i’m', 'trump', 'one', 'it’s', 'don’t']
Topic 2: ['', 'like', 'one', '&amp;', 'love', 'happy', 'got', 'u', 'trump', 'best']
Topic 3: ['need', 'get', 'love', 'people', '', 'it’s', 'first', '&amp;', 'got', 'like']
Topic 4: ['-', '', '&amp;', 'i’m', '.', 'people', 'new', 'love', 'need', 'please']


In [16]:
import nbformat

# Load the notebook file
with open('topic_modeling_all_tweets.ipynb', 'r', encoding='utf-8') as f:
    notebook = nbformat.read(f, as_version=4)

# Count the lines of code in all code cells
code_lines = 0
for cell in notebook['cells']:
    if cell['cell_type'] == 'code':
        code_lines += len(cell['source'].splitlines())

print(f"Total lines of code: {code_lines}")


Total lines of code: 106


In [1]:
import json

def count_spark_operations(notebook_path):
    # Open the notebook file with utf-8 encoding
    with open(notebook_path, 'r', encoding='utf-8') as f:
        notebook = json.load(f)

    spark_operation_count = 0
    for cell in notebook['cells']:
        if cell['cell_type'] == 'code':
            code = cell['source']
            # Check for common Spark operation keywords
            for line in code:
                if any(op in line for op in ['filter', 'select', 'groupBy', 'join', 'map', 'reduce', 'show', 'collect']):
                    spark_operation_count += 1

    return spark_operation_count

# Example usage
notebook_path = 'topic_modeling_all_tweets.ipynb'  # Update this with your notebook path
operation_count = count_spark_operations(notebook_path)
print(f"Number of Spark Operations Found: {operation_count}")


Number of Spark Operations Found: 12
