### PySpark reading a file form a CSV

In [1]:
# import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BigData") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()


24/03/18 00:26:02 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
### Basic - EDA

In [4]:
df_emails = spark.read.format("csv").option("header", "true").load("hdfs://localhost:8020/user1/emails.csv")
df_emails.show()


+--------------------+--------------------+
|                file|             message|
+--------------------+--------------------+
|allen-p/_sent_mai...|Message-ID: <1878...|
|           Date: Mon| 14 May 2001 16:3...|
|From: phillip.all...|                null|
|To: tim.belden@en...|                null|
|           Subject: |                null|
|   Mime-Version: 1.0|                null|
|Content-Type: tex...|                null|
|Content-Transfer-...|                null|
|X-From: Phillip K...|                null|
|X-To: Tim Belden ...|                null|
|              X-cc: |                null|
|             X-bcc: |                null|
|X-Folder: \Philli...| Phillip K.\'Sent...|
|   X-Origin: Allen-P|                null|
|X-FileName: palle...|                null|
|Here is our forecast|                null|
|                   "|                null|
|allen-p/_sent_mai...|Message-ID: <1546...|
|           Date: Fri| 4 May 2001 13:51...|
|From: phillip.all...|          

In [5]:
df_emails.count()


                                                                                

8307068

In [6]:
from pyspark.sql.functions import col, count, when

df_emails.select([count(when(col(c).isNull(), c)).alias(c) for c in df_emails.columns]).show()




+----+-------+
|file|message|
+----+-------+
|7215|5798819|
+----+-------+



                                                                                

In [7]:
# Describe provides summary statistics of numeric columns in a DataFrame
df_emails.describe().show()




+-------+--------------------+--------+
|summary|                file| message|
+-------+--------------------+--------+
|  count|             8299853| 2508249|
|   mean|                 NaN|Infinity|
| stddev|                 NaN|     NaN|
|    min|                  \t|      \t|
|    max|~~~~~~~~~~~~~~~~~...|       ||
+-------+--------------------+--------+



                                                                                

In [8]:
# Show the first few rows
df_emails.show(n=20)


+--------------------+--------------------+
|                file|             message|
+--------------------+--------------------+
|allen-p/_sent_mai...|Message-ID: <1878...|
|           Date: Mon| 14 May 2001 16:3...|
|From: phillip.all...|                null|
|To: tim.belden@en...|                null|
|           Subject: |                null|
|   Mime-Version: 1.0|                null|
|Content-Type: tex...|                null|
|Content-Transfer-...|                null|
|X-From: Phillip K...|                null|
|X-To: Tim Belden ...|                null|
|              X-cc: |                null|
|             X-bcc: |                null|
|X-Folder: \Philli...| Phillip K.\'Sent...|
|   X-Origin: Allen-P|                null|
|X-FileName: palle...|                null|
|Here is our forecast|                null|
|                   "|                null|
|allen-p/_sent_mai...|Message-ID: <1546...|
|           Date: Fri| 4 May 2001 13:51...|
|From: phillip.all...|          

In [15]:
from pyspark.sql.functions import regexp_extract

# Example regular expression pattern for a date in the format "E, dd MMM yyyy HH:mm:ss Z"
# Adjust this pattern to match the actual format found in your 'message' data
date_pattern = r'\bMon, \d{2} \w{3} \d{4} \d{2}:\d{2}:\d{2} -\d{4} \(PDT\)'

# Create a new column 'ExtractedDate' by extracting the date string from 'message'
df_emails = df_emails.withColumn("ExtractedDate", regexp_extract("message", date_pattern, 0))

# Show the result of extraction
df_emails.select("ExtractedDate").show(truncate=False, n=5)


+-------------+
|ExtractedDate|
+-------------+
|             |
|             |
|null         |
|null         |
|null         |
+-------------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import to_timestamp

# Convert the extracted date string to a timestamp format (adjust the format string as necessary)
df_emails = df_emails.withColumn("DateTimestamp", to_timestamp("ExtractedDate", "E, dd MMM yyyy HH:mm:ss Z"))

# Show the DataFrame with the new timestamp column
df_emails.select("DateTimestamp").show(truncate=False, n=5)


SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.DATETIME_PATTERN_RECOGNITION] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to recognize 'E, dd MMM yyyy HH:mm:ss Z' pattern in the DateTimeFormatter. 1) You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from 'https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html'.

In [17]:
# Sort the DataFrame by the timestamp column
df_emails_sorted = df_emails.orderBy("DateTimestamp", ascending=False)

# Display the sorted DataFrame
df_emails_sorted.show(n=5, truncate=False)


SparkUpgradeException: [INCONSISTENT_BEHAVIOR_CROSS_VERSION.DATETIME_PATTERN_RECOGNITION] You may get a different result due to the upgrading to Spark >= 3.0:
Fail to recognize 'E, dd MMM yyyy HH:mm:ss Z' pattern in the DateTimeFormatter. 1) You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from 'https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html'.

In [10]:
# Print the schema of the DataFrame
df_emails.printSchema()


root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [11]:
df_emails.show(truncate=False, n=100)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|file                                                                                                                                                                                                                                                                                                                                 |message                                      

In [12]:
from pyspark.sql.functions import col

df_emails.groupBy("From").count().orderBy(col("count").desc()).show(truncate=False)


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `From` cannot be resolved. Did you mean one of the following? [`file`, `message`].;
'Aggregate ['From], ['From, count(1) AS count#256L]
+- Relation [file#17,message#18] csv


In [13]:
from pyspark.sql.functions import desc, col

# Filter out emails with blank 'From' field
filtered_df_emails = df_emails.filter(col("From") != "")

# Aggregate data in Spark to find top 10 email senders, excluding blanks
top_senders_df = filtered_df_emails.groupBy("From").count().orderBy(desc("count")).limit(10)

# Collecting the aggregated DataFrame to Pandas for visualization
top_senders_pdf = top_senders_df.toPandas()

# Plotting with Matplotlib
top_senders_pdf.plot(kind='bar', x='From', y='count', legend=None, figsize=(10, 6))
plt.title('Top 10 Email Senders (Excluding Blanks)')
plt.xlabel('Sender')
plt.ylabel('Number of Emails')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()  # Adjust layout to make room for the rotated x-axis labels
plt.show()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `From` cannot be resolved. Did you mean one of the following? [`file`, `message`].;
'Filter NOT ('From = )
+- Relation [file#17,message#18] csv


In [14]:
df_emails.printSchema()


root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



### Text Preprocessing


In [None]:
from pyspark.sql.functions import regexp_replace

# Remove unnecessary characters (numbers and punctuation)
df_cleaned = df_emails.withColumn("CleanedMessage", regexp_replace("Message", "[^a-zA-Z\\s]", ""))

# Optionally, you might want to lower the case
from pyspark.sql.functions import lower
df_cleaned = df_cleaned.withColumn("CleanedMessage", lower(col("CleanedMessage")))


In [None]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol="CleanedMessage", outputCol="Tokens")
df_tokens = tokenizer.transform(df_cleaned)


In [None]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="Tokens", outputCol="FilteredTokens")
df_filtered = remover.transform(df_tokens)


In [None]:
# For CountVectorizer
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="FilteredTokens", outputCol="Features")
model = cv.fit(df_filtered)
df_features = model.transform(df_filtered)

# Optionally, for TF-IDF
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="FilteredTokens", outputCol="RawFeatures")
featurizedData = hashingTF.transform(df_filtered)

idf = IDF(inputCol="RawFeatures", outputCol="Features")
idfModel = idf.fit(featurizedData)
df_features = idfModel.transform(featurizedData)


In [None]:
(trainingData, testData) = df_features.randomSplit([0.7, 0.3], seed=100)


In [None]:
# sc master - running locally
sc.master

In [None]:
# The inferred schema can be visualized using the printSchema() method
df.printSchema()

In [None]:
def parse_message(message):
    # Initialize a dictionary to hold the parsed data
    parsed_data = {
        "MessageID": "",
        "Date": "",
        "From": "",
        "To": "",
        "Subject": ""
    }
    
    # Split the message into lines for processing
    lines = message.split("\n")
    for line in lines:
        if line.startswith("Message-ID:"):
            parsed_data["MessageID"] = line[len("Message-ID:"):].strip()
        elif line.startswith("Date:"):
            parsed_data["Date"] = line[len("Date:"):].strip()
        elif line.startswith("From:"):
            parsed_data["From"] = line[len("From:"):].strip()
        elif line.startswith("To:"):
            parsed_data["To"] = line[len("To:"):].strip()
        elif line.startswith("Subject:"):
            parsed_data["Subject"] = line[len("Subject:"):].strip()
        # Add more conditions as needed for other fields
    
    return parsed_data


In [None]:
def show_first_10_messages(df):
    # Assuming 'df' is your DataFrame and it has a column named 'message'
    # This will show the first 10 rows of the 'message' column
    df.select("message").show(10, truncate=False)

# Call the function with your DataFrame
show_first_10_messages(df)


In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType

# Define your schema as before
schema = StructType([
    StructField("MessageID", StringType(), True),
    StructField("Date", StringType(), True),
    StructField("From", StringType(), True),
    StructField("To", StringType(), True),
    StructField("Subject", StringType(), True),
    # Add other fields as necessary
])

def parse_message(message):
    # Initialize the dictionary with default empty strings
    parsed_data = {
        "MessageID": "",
        "Date": "",
        "From": "",
        "To": "",
        "Subject": ""
    }
    
    # Proceed only if message is not None and is a string
    if message and isinstance(message, str):
        lines = message.split('\n')
        for line in lines:
            if line.startswith("Message-ID:"):
                parsed_data["MessageID"] = line.split(":", 1)[1].strip()
            elif line.startswith("Date:"):
                parsed_data["Date"] = line.split(":", 1)[1].strip()
            elif line.startswith("From:"):
                parsed_data["From"] = line.split(":", 1)[1].strip()
            elif line.startswith("To:"):
                parsed_data["To"] = line.split(":", 1)[1].strip()
            elif line.startswith("Subject:"):
                parsed_data["Subject"] = line.split(":", 1)[1].strip()
            # Continue with other headers as needed
    
    return parsed_data

# Register the UDF with the modified parse_message function
parse_message_udf = udf(parse_message, schema)

# Apply the UDF to your DataFrame as before
df_parsed = df.withColumn("parsed_message", parse_message_udf(df["message"]))


In [None]:
def show_specific_message(df, index):
    """
    Displays a specific message from the DataFrame based on the provided index.

    Parameters:
    - df: The Spark DataFrame containing the messages.
    - index: The index (row number) of the message to display.
    """
    # Ensure the DataFrame has a column named 'message'
    if 'message' in df.columns:
        # Collect the row of interest into a list
        message_row = df.select("message").collect()[index]
        
        # Extract the message from the row and print it
        message_content = message_row["message"]
        print(f"Message at index {index}:\n{message_content}")
    else:
        print("The DataFrame does not contain a column named 'message'.")

# Example usage:
# Assuming 'df' is your DataFrame and you want to see the first message
show_specific_message(df, 0)


In [None]:
!pip install scikit-learn

In [None]:
import pandas as pd
import csv
import re

# Load the dataset with the correct parameters for handling potential parsing issues
df_emails = pd.read_csv('emails.csv', quoting=csv.QUOTE_NONE, on_bad_lines='skip', escapechar="\\")

# Removing quotes from column names if they exist
df_emails.columns = df_emails.columns.str.replace('"', '')

# Handling Missing Values
df_emails.fillna('', inplace=True)

# Text Preprocessing Function
def preprocess_text(text):
    # Convert text to lowercase
    text = text.lower()
    # Remove email headers or unnecessary metadata (for demonstration, might need customization)
    text = re.sub(r'^[a-z]+:.*$', '', text)  # Remove lines that start with metadata-like patterns
    text = re.sub(r'\s+', ' ', text)  # Replace multiple whitespace with single space
    # Remove special characters (customize based on the dataset and needs)
    text = re.sub(r'[^a-z0-9\s]', '', text)
    return text.strip()

# Apply text preprocessing to the 'message' column
df_emails['message'] = df_emails['message'].apply(preprocess_text)

# Display the first few rows of the cleaned dataframe
df_emails.head(15)


In [None]:
import pandas as pd

# Mock example of a structured dataframe
data = {
    'body': ['This is the first email content.', 'Here is another email, potentially suspicious.', 'This email is safe and informative.']
}
emails_structured_df = pd.DataFrame(data)

# Assuming the vectorization function from the previous message, apply it here:
from sklearn.feature_extraction.text import TfidfVectorizer

def vectorize_texts(texts):
    vectorizer = TfidfVectorizer(
        lowercase=True,
        stop_words='english',
        max_features=10000,
        min_df=1,
        max_df=0.9
    )
    X = vectorizer.fit_transform(texts)
    return X, vectorizer

# Apply vectorization to the 'body' column of emails_structured_df
texts = emails_structured_df['body'].tolist()
X, vectorizer = vectorize_texts(texts)

print("Vectorization Complete. Shape of X:", X.shape)


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.naive_bayes import MultinomialNB

def evaluate_model(X, y):
    """
    Splits the data into training and testing sets, trains a model, and evaluates its performance.
    
    Parameters:
    X (sparse matrix): The feature matrix obtained from vectorizing the text data.
    y (array-like): The target labels indicating the class of each document.
    
    Returns:
    A dictionary containing the model's performance metrics: accuracy, precision, recall, and F1 score.
    """
    
    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
    
    # Initialize and train the model
    model = MultinomialNB()
    model.fit(X_train, y_train)
    
    # Make predictions on the testing set
    y_pred = model.predict(X_test)
    
    # Calculate performance metrics
    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred, average='weighted'),
        'recall': recall_score(y_test, y_pred, average='weighted'),
        'f1_score': f1_score(y_test, y_pred, average='weighted')
    }
    
    return metrics

# Example usage
# Assume y is your array of labels for the dataset, with 1 indicating suspicious and 0 indicating not suspicious
# y = [1, 0, 1, ...]  # This should be the actual labels for your dataset
# metrics = evaluate_model(X, y)
# print(metrics)


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

def create_model(input_dim):
    # Create a Sequential model
    model = Sequential()
    # Add layers to the model
    model.add(Dense(128, activation='relu', input_dim=input_dim))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid')) # Use 'sigmoid' for binary classification
    
    # Compile the model
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',
                  metrics=['accuracy', 'Precision', 'Recall'])
    return model


In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

# Mock data for demonstration purposes
# Replace these with your actual vectorized data (X) and labels (y)
X = np.random.rand(100, 20)  # Example feature matrix with 100 samples and 20 features
y = np.random.randint(2, size=100)  # Example binary labels

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [None]:
def create_model(input_dim):
    model = Sequential()
    model.add(Dense(128, activation='relu', input_dim=input_dim))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(1, activation='sigmoid'))  # Sigmoid activation for binary classification
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',  # Use binary_crossentropy for binary classification
                  metrics=['accuracy', 'Precision', 'Recall'])
    return model


In [None]:
# Now that X_train is defined, we can proceed to use it
input_dim = X_train.shape[1]  # Number of features from the vectorized data
model = create_model(input_dim)

# Train the model
history = model.fit(X_train, y_train, epochs=100, batch_size=32, validation_data=(X_test, y_test))


In [None]:
# Example new data
new_data = ["This is a new email conversation.", "Another suspicious email detected!", "Normal conversation."]

# Preprocess and vectorize new data
new_data_processed = [preprocess_text(text) for text in new_data]  # Using the same preprocess_text function from before
new_data_vectorized = vectorizer.transform(new_data_processed)  # Use the same vectorizer fitted on the training data


In [None]:
def prepare_and_predict(new_data, vectorizer, model):
    # Preprocess new data
    new_data_processed = [preprocess_text(text) for text in new_data]
    
    # Vectorize new data using the same vectorizer instance used for training
    new_data_vectorized = vectorizer.transform(new_data_processed)
    
    # IMPORTANT: Artificially ensure the shape matches the expected input of the model
    # This step is hypothetical and serves to illustrate the concept
    # In practice, ensure your data vectorization matches the training phase accurately
    if new_data_vectorized.shape[1] < 20:
        # Assuming the missing features can be set to 0 (this is a strong assumption and may not be valid)
        additional_zeros = np.zeros((new_data_vectorized.shape[0], 20 - new_data_vectorized.shape[1]))
        new_data_vectorized = np.hstack((new_data_vectorized.toarray(), additional_zeros))
    
    predictions = model.predict(new_data_vectorized)
    predicted_probabilities = predictions.flatten()
    return predicted_probabilities

# Example usage
new_data = ["This is a new email conversation.", "Another suspicious email detected!", "Normal conversation."]
predicted_probabilities = prepare_and_predict(new_data, vectorizer, model)
print(predicted_probabilities)


In [None]:
import matplotlib.pyplot as plt

def visualize_predictions(emails, probabilities):
    """
    Visualizes the predicted probabilities of emails being suspicious or fraudulent.

    Parameters:
    - emails: A list of email texts or subjects being analyzed.
    - probabilities: A list of probabilities corresponding to the likelihood of each email being suspicious.
    """
    # Ensure the lists have the same length
    assert len(emails) == len(probabilities), "Emails and probabilities lists must have the same length."

    # Creating the bar plot
    plt.figure(figsize=(10, 6))
    plt.barh(emails, probabilities, color='skyblue')
    plt.xlabel('Probability of Being Suspicious')
    plt.title('Predicted Probabilities of Emails Being Suspicious or Fraudulent')
    for index, value in enumerate(probabilities):
        plt.text(value, index, f"{value:.2f}")
    plt.xlim(0, 1)  # Assuming probabilities range from 0 to 1
    plt.show()

# Example usage:
emails = ["New email conversation", "Suspicious email detected", "Normal conversation"]
probabilities = [0.34725702, 0.08264993, 0.34725702]
visualize_predictions(emails, probabilities)


In [None]:
def visualize_fraud_predictions(email_texts, predictions):
    """
    Visualize the emails with their predicted fraud probabilities.

    Parameters:
    - email_texts: List of email text content.
    - predictions: List of predicted probabilities corresponding to the fraud likelihood of each email.

    The function doesn't return anything but prints each email with its fraud prediction.
    """
    for email, probability in zip(email_texts, predictions):
        print("Email Content:\n", email)
        print("Fraud Likelihood: {:.2%}".format(probability))
        print("-" * 100)

# Example usage
email_texts = [
    "This is a new email conversation.",
    "Another suspicious email detected!",
    "Normal conversation."
]
predicted_probabilities = [0.34725702, 0.08264993, 0.34725702]

visualize_fraud_predictions(email_texts, predicted_probabilities)


In [None]:
def filter_emails_by_similarity_and_likelihood(emails, similarity_threshold=0.5, likelihood_threshold=8.0):
    """
    Filters emails based on content similarity to a given phrase and a likelihood threshold.
    
    Parameters:
    - emails: List of dictionaries, where each dictionary contains 'content' and 'likelihood' keys.
    - similarity_threshold: A threshold for determining content similarity (not used in this simple example).
    - likelihood_threshold: The minimum likelihood score for an email to be considered suspicious.
    
    Returns:
    - A list of emails considered suspicious based on the likelihood threshold.
    """
    suspicious_phrase = "Another suspicious email detected!"
    filtered_emails = [email for email in emails if suspicious_phrase in email['content'] and email['likelihood'] >= likelihood_threshold]
    return filtered_emails

# Example usage:
emails = [
    {'content': "This is a normal email content.", 'likelihood': 2.0},
    {'content': "Another suspicious email detected! Please check it out.", 'likelihood': 8.26},
    {'content': "Another suspicious email detected! This seems like a scam.", 'likelihood': 9.5},
    {'content': "This is another normal conversation.", 'likelihood': 3.2}
]

# Filtering emails:
suspicious_emails = filter_emails_by_similarity_and_likelihood(emails, likelihood_threshold=8.0)

# Displaying the filtered, suspicious emails:
for email in suspicious_emails:
    print(f"Email Content: {email['content']}")
    print(f"Fraud Likelihood: {email['likelihood']}%")
    print("-"*80)


In [None]:
import pandas as pd

def filter_emails_by_keywords(filename, keywords):
    # Load the dataset
    df_emails = pd.read_csv(filename, usecols=['message'])
    # Convert the keywords to lowercase
    keywords = [keyword.lower() for keyword in keywords]
    # Filter the emails
    filtered_emails = df_emails[df_emails['message'].str.lower().apply(lambda x: any(keyword in x for keyword in keywords))]
    return filtered_emails

# Example usage
filename = 'emails.csv'
keywords = ['dinheiro']
filtered_emails = filter_emails_by_keywords(filename, keywords)

# Safely attempt to sample from the filtered DataFrame
if len(filtered_emails) >= 100:
    print(filtered_emails.sample(100))
elif len(filtered_emails) > 0:
    print(filtered_emails.sample(len(filtered_emails)))  # Sample whatever number of rows are available
else:
    print("No emails match the specified keywords.")


In [None]:
import pandas as pd

# Load your dataset
df_emails = pd.read_csv('emails.csv')

# List of row indices you want to view, adjust these indices according to your needs
indices = [454084, 437156, 440097]  # Replace these with the indices you're interested in

# Iterate through the list of indices and print each corresponding message
for index in indices:
    print(f"Message at index {index}:")
    print(df_emails.loc[index, 'message'])
    print("-" * 100)  # Print a separator for readability
