In [10]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import StructType, StructField, DoubleType

In [11]:
df = pd.read_csv('train.csv')

In [12]:
df.head(1)

Unnamed: 0.1,Unnamed: 0,ID,name,category,main_category,currency,deadline,goal,launched,pledged,state,backers,country,usd pledged
0,92411,65378302,Tabletop Tyrant Miniature Wargaming Storage / ...,Tabletop Games,Games,GBP,2016-02-27 22:21:00,1000.0,2016-01-28 22:21:00,7529.0,successful,206.0,GB,10798.12


In [13]:
df.isnull().sum()

Unnamed: 0       0
ID               0
name             3
category         0
main_category    0
currency         0
deadline         0
goal             0
launched         0
pledged          0
state            0
backers          0
country          0
usd pledged      0
dtype: int64

In [14]:
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("NaiveBayes")\
    .getOrCreate()
sc=spark.sparkContext

In [15]:
# Prepare data for training

# Drop unnecessary columns
df.drop(['ID', 'name', 'usd pledged', 'Unnamed: 0'], axis=1, inplace=True)

# Drop rows with missing values
df.dropna(inplace=True)

# Calculate the project duration in days
df['deadline'] = pd.to_datetime(df['deadline'])
df['launched'] = pd.to_datetime(df['launched'])
df['duration'] = (df['deadline'] - df['launched']).dt.days
df.drop(['deadline', 'launched'], axis=1, inplace=True)

# convert all strings to lowercase
df['category'] = df['category'].str.lower()
df['main_category'] = df['main_category'].str.lower()
df['currency'] = df['currency'].str.lower()
df.drop(['category'], axis=1, inplace=True)

# Change the state column to 0 and 1
df["state"] = df["state"].map({"failed": 0, "successful": 1})

In [16]:
df.head(1)

Unnamed: 0,main_category,currency,goal,pledged,state,backers,country,duration
0,games,gbp,1000.0,7529.0,1,206.0,GB,30


In [17]:
df = pd.get_dummies(df, columns=["main_category", "currency", "country"])

In [18]:
df.columns

Index(['goal', 'pledged', 'state', 'backers', 'duration', 'main_category_art',
       'main_category_comics', 'main_category_crafts', 'main_category_dance',
       'main_category_design', 'main_category_fashion',
       'main_category_film & video', 'main_category_food',
       'main_category_games', 'main_category_journalism',
       'main_category_music', 'main_category_photography',
       'main_category_publishing', 'main_category_technology',
       'main_category_theater', 'currency_aud', 'currency_cad', 'currency_chf',
       'currency_dkk', 'currency_eur', 'currency_gbp', 'currency_hkd',
       'currency_jpy', 'currency_mxn', 'currency_nok', 'currency_nzd',
       'currency_sek', 'currency_sgd', 'currency_usd', 'country_AT',
       'country_AU', 'country_BE', 'country_CA', 'country_CH', 'country_DE',
       'country_DK', 'country_ES', 'country_FR', 'country_GB', 'country_HK',
       'country_IE', 'country_IT', 'country_JP', 'country_LU', 'country_MX',
       'country_NL', 'coun

In [19]:
df.head(6)

Unnamed: 0,goal,pledged,state,backers,duration,main_category_art,main_category_comics,main_category_crafts,main_category_dance,main_category_design,...,country_JP,country_LU,country_MX,country_NL,country_NO,country_NZ,country_None,country_SE,country_SG,country_US
0,1000.0,7529.0,1,206.0,30,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,7000.0,331.0,0,6.0,58,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
2,490000.0,10.0,0,1.0,30,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
3,6500.0,7230.0,1,219.0,30,1,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
4,400.0,455.0,1,15.0,12,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,1
5,25000.0,31.0,0,3.0,30,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [22]:
val = pd.read_csv('val.csv')
# Prepare data for training

# Drop unnecessary columns
val.drop(['ID', 'name', 'usd pledged', 'Unnamed: 0'], axis=1, inplace=True)

# Drop rows with missing values
val.dropna(inplace=True)

# Calculate the project duration in days
val['deadline'] = pd.to_datetime(val['deadline'])
val['launched'] = pd.to_datetime(val['launched'])
val['duration'] = (val['deadline'] - val['launched']).dt.days
val.drop(['deadline', 'launched'], axis=1, inplace=True)

# convert all strings to lowercase
val['category'] = val['category'].str.lower()
val['main_category'] = val['main_category'].str.lower()
val['currency'] = val['currency'].str.lower()
val.drop(['category'], axis=1, inplace=True)

# Change the state column to 0 and 1
val["state"] = val["state"].map({"failed": 0, "successful": 1})
val = pd.get_dummies(val, columns=["main_category", "currency", "country"])

In [20]:
# Define the map function
def map_function(line):
    # Extract the features and target from the line
    X = line[:-1]
    y = int(line[-1])

    # Create a list of tuples to store the feature index, value, target, and count
    tuples = []

    # Loop through each feature index and value
    for i, x in enumerate(X):
        # Append a tuple of feature index, value, target, and count to the list
        tuples.append(((i, x, y), 1))

    # Return the list of tuples
    return tuples

# Define the reduce function
def reduce_function(a, b):
    # Sum the counts for each tuple
    return a + b

In [21]:
spark_df = spark.createDataFrame(df)

In [23]:
spark_val = spark.createDataFrame(val)

In [24]:
train_rdd = spark_df.rdd
val_rdd = spark_val.rdd

In [25]:
# Apply the map function to the data RDD
mapped_rdd = train_rdd.flatMap(map_function)

In [26]:
# Apply the reduce function to count the occurrences of each tuple
reduced_rdd = mapped_rdd.reduceByKey(reduce_function)

In [27]:
# Collect the frequency table as a dictionary
frequency_table = reduced_rdd.collectAsMap()

23/05/15 18:31:45 WARN TaskSetManager: Stage 0 contains a task of very large size (2630 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [28]:
def predict(line, frequency_table):
    frequency_table = frequency_table.value

    # Extract the features and target from the line
    X = line[:-1]
    y = int(line[-1])

    # Initialize an empty list to store the posterior probabilities for each class label
    posteriors = []

    # Loop through each class label (0 or 1)
    for label in [0, 1]:
        # Initialize the posterior probability as the prior probability of the class label with Laplace smoothing
        posterior = (frequency_table[(-1, label)] + 1) / (frequency_table[(-1, -1)] + 2)

        # Loop through each feature index and value
        for i, x in enumerate(X):
            x = float(x)
            try:
                posterior *= (frequency_table[(i, x, label)] + 1) / (frequency_table[(-1, label)] + len(frequency_table))
            except:
                #print("Error: Feature %d value %s not in training data" % (i, x))
                posterior *= (0 + 1) / (frequency_table[(-1, label)] + len(frequency_table))

        # Append the posterior probability to the list
        posteriors.append(posterior)

    # Choose the class label with the highest posterior probability as the prediction
    prediction = np.argmax(posteriors)

    # Return a tuple of true label and predicted label
    return (float(y), float(prediction))

In [29]:
# Add a special key-value pair to store the total number of records in the train RDD
frequency_table[(-1, -1)] = train_rdd.count()
# Add another special key-value pair to store the number of records with each class label in the train RDD
frequency_table[(-1, 0)] = train_rdd.filter(lambda row: row.state == 0).count()
frequency_table[(-1, 1)] = train_rdd.filter(lambda row: row.state == 1).count()

bcast = spark.sparkContext.broadcast(frequency_table)

# Apply the predict function on the test RDD using map and pass the frequency_table as an argument using broadcast
predictions = val_rdd.map(lambda line: predict(line, bcast))
# Evaluate the accuracy of the predictions using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="true_label", predictionCol="predicted_label", metricName="accuracy")
# Define the schema of the predictions DataFrame
schema = StructType([
  StructField("true_label", DoubleType(), True),
  StructField("predicted_label", DoubleType(), True)
])
# Create a DataFrame from the predictions RDD
predictions = predictions.toDF(schema=schema)

23/05/15 18:32:27 WARN TaskSetManager: Stage 2 contains a task of very large size (2630 KiB). The maximum recommended task size is 1000 KiB.
23/05/15 18:32:28 WARN TaskSetManager: Stage 3 contains a task of very large size (2630 KiB). The maximum recommended task size is 1000 KiB.
23/05/15 18:32:29 WARN TaskSetManager: Stage 4 contains a task of very large size (2630 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [31]:
accuracy = evaluator.evaluate(predictions)
print(f"Validation set accuracy = {accuracy}")



Validation set accuracy = 0.785493151305556


                                                                                