In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, avg
from pyspark.sql.types import IntegerType, FloatType, DateType
from pyspark.sql.functions import when, col, count, sum, desc
import matplotlib.pyplot as plt




In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=7b555c8d0f7816ca13c388a1bd30ee9b087b3643653b7e9b2730f650262bdaf3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [32]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count, when
from pyspark.sql.types import DateType, FloatType

# Create a Spark session
spark = SparkSession.builder.appName("KickstarterAnalysis").getOrCreate()

# Load the data
df = spark.read.csv("/content/kick_starter.csv", header=True, inferSchema=True)

# Display schema and initial data
df.printSchema()
df.show(5)



root
 |-- ID: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- goal: string (nullable = true)
 |-- launched: string (nullable = true)
 |-- pledged: string (nullable = true)
 |-- state: string (nullable = true)
 |-- backers: string (nullable = true)
 |-- country: string (nullable = true)
 |-- usd pledged: string (nullable = true)
 |-- usd_pledged_real: string (nullable = true)
 |-- usd_goal_real: string (nullable = true)

+----------+--------------------+--------------+-------------+--------+----------+--------+-------------------+-------+--------+-------+-------+-----------+----------------+-------------+
|        ID|                name|      category|main_category|currency|  deadline|    goal|           launched|pledged|   state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|
+----------+----

In [14]:
# Number of rows
num_rows = df.count()

# Number of columns
num_columns = len(df.columns)

shape = (num_rows, num_columns)

print(f"Shape of the DataFrame: {shape}")

Shape of the DataFrame: (378661, 15)


In [24]:
# Count null values for each column before applying any data transformations
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show()

+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+
| ID|name|category|main_category|currency|deadline|goal|launched|pledged|state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|label|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+
|  0|   4|       0|            0|       0|    1292|1292|    1109|   1109|    0|      0|      0|       4895|             193|           19|    0|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+



In [33]:
# Remove duplicates
df = df.dropDuplicates()

# Converting date columns to Date Type
df = df.withColumn("deadline", col("deadline").cast(DateType())) \
       .withColumn("launched", col("launched").cast(DateType()))

# Replace null values with the average value for numerical columns
numerical_columns = [col_name for col_name, dtype in df.dtypes if dtype in ['int', 'double', 'float']]
for col_name in numerical_columns:
    avg_value = df.select(avg(col_name)).first()[0]
    df = df.na.fill({col_name: avg_value})




In [30]:
#to ensure that there are no null values remaining
null_counts = df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns])
null_counts.show()

+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+
| ID|name|category|main_category|currency|deadline|goal|launched|pledged|state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|label|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+
|  0|   4|       0|            0|       0|    1292|   0|    1109|      0|    0|      0|      0|          0|               0|            0|    0|
+---+----+--------+-------------+--------+--------+----+--------+-------+-----+-------+-------+-----------+----------------+-------------+-----+



In [34]:
# Prepare Target Columns

# 1) Drop live projects
df = df.filter(df.state != "live")
# 2) count "successful" states as outcome = 1, otherwise will be a zero
df = df.withColumn("label", when(df.state == "successful", 1).otherwise(0))

# Convert columns to Float
df = df.withColumn("goal", df["goal"].cast(FloatType()))
df = df.withColumn("pledged", df["pledged"].cast(FloatType()))
df = df.withColumn("usd pledged", df["usd pledged"].cast(FloatType()))
df = df.withColumn("usd_pledged_real", df["usd_pledged_real"].cast(FloatType()))
df = df.withColumn("usd_goal_real", df["usd_goal_real"].cast(FloatType()))

df.show(5)

+----------+--------------------+---------+-------------+--------+----------+-------+----------+-------+----------+-------+-------+-----------+----------------+-------------+-----+
|        ID|                name| category|main_category|currency|  deadline|   goal|  launched|pledged|     state|backers|country|usd pledged|usd_pledged_real|usd_goal_real|label|
+----------+--------------------+---------+-------------+--------+----------+-------+----------+-------+----------+-------+-------+-----------+----------------+-------------+-----+
|1000294559|Secular Solstice ...|Festivals|      Theater|     USD|2014-10-27| 7500.0|2014-10-02|8157.01|successful|    164|     US|    8157.01|         8157.01|       7500.0|    1|
|  10003650|Glyscian Debut Al...|     Rock|        Music|     USD|2012-05-26|15000.0|2012-03-27|  151.0|  canceled|      4|     US|      151.0|           151.0|      15000.0|    0|
|1000581546|      Project U-Neek|      Art|          Art|     GBP|2013-05-08| 1500.0|2013-04-08

In [22]:
# shape after data transformation
num_rows = df.count()
num_columns = len(df.columns)
shape = (num_rows, num_columns)

print(f"Shape of the DataFrame: {shape}")

Shape of the DataFrame: (375860, 16)


In [38]:
#query 1:  Top 5 Categories with the Highest Success Rate


from pyspark.sql.functions import col, when

# Add a new column for success (1 if success, 0 otherwise)
df = df.withColumn("success", when(col("state") == "successful", 1).otherwise(0))

# Group by main_category and calculate the success rate
success_rate = df.groupBy("main_category").agg((sum("success") / count("success")).alias("success_rate"))
top_categories = success_rate.orderBy(col("success_rate").desc()).limit(5)
top_categories.show()


# Visualization by using an interactive bar chart
import plotly.express as px
fig = px.bar(top_categories.toPandas(), x='main_category', y='success_rate', title='Top 5 Categories with the Highest Success Rate')
fig.show()



+-------------+-------------------+
|main_category|       success_rate|
+-------------+-------------------+
|        Dance| 0.6184245660881175|
|      Theater| 0.5969942836068597|
|       Comics| 0.5429050591633281|
|        Music|0.46764843202863593|
|          Art| 0.4112900333536563|
+-------------+-------------------+



Query 2:

Average Number of Backers by Main Category

In [39]:
# Clean the country column by filtering out non-alphabetic entries
#cleaned_df = df.filter(col("country").rlike("[a-zA-Z]"))

# Group by main_category and calculate the average number of backers
avg_backers_by_category = df.groupBy("main_category").agg(avg("backers").alias("avg_backers"))

# Order by average number of backers in descending order
avg_backers_by_category = avg_backers_by_category.orderBy(col("avg_backers").desc())
avg_backers_by_category.show()




+--------------------+-----------+
|       main_category|avg_backers|
+--------------------+-----------+
|           by Blule"|  102909.06|
|             Shelter|    77750.0|
|        & Destiny"""|    65500.9|
|       The Movement"|   27029.88|
| and universal so...|    25499.0|
|     and Burnouts"""|    22540.0|
|"" A Children's B...|   20123.47|
|             Redux!"|    16735.0|
|  I'll record a C...|    15220.0|
|         or Squeak!"|    15024.0|
| Maulana"": The S...|    13199.9|
| SYREN Modern Dance"|    12506.0|
| rhyming picture ...|    11001.0|
|          Apollo""!"|    10895.5|
|     Or Dead"" Film"|    10647.0|
|      Graphic Novel"|    10112.0|
| VICTORY"" PORPHY...|    10075.0|
|         and poetry"|     8230.0|
| and Sir Hound"" ...|     7830.0|
| is coming your w...|     7394.0|
+--------------------+-----------+
only showing top 20 rows



In [43]:
# Select the top 20 categories based on the average number of backers
top_20_categories = avg_backers_by_category.limit(20)

# Visualization using Sunburst Chart
import plotly.express as px

fig = px.sunburst(top_20_categories.toPandas(), path=['main_category'], values='avg_backers', title='Average Number of Backers by Main Category (Top 20)')
fig.show()


In [45]:
# Visualization using Treemap
import plotly.express as px

fig = px.treemap(avg_backers_by_category.toPandas(), path=['main_category'], values='avg_backers', title='Average Number of Backers by Main Category')
fig.show()


In [None]:
import seaborn as sns
# Query 3: Average pledged amount for successful projects by main category
query3 = df.filter(df.state == "successful").groupBy("main_category").avg("usd_pledged_real")
query3.show()
query3_pd = query3.toPandas()
# Visualization 3: Interactive Scatter Plot
fig = px.scatter(query3_pd, x='main_category', y='avg(usd_pledged_real)', color='main_category', title='Average Pledged Amount for Successful Projects by Main Category')
fig.show()

+-------------+---------------------+
|main_category|avg(usd_pledged_real)|
+-------------+---------------------+
|         Food|    18050.81322409646|
|          Art|    6925.822104362198|
|      Fashion|    20126.57219421118|
| Film & Video|    14071.19209427663|
|   Publishing|    9267.042699950787|
|       Crafts|    5272.591981675741|
|       Comics|   11597.742356049155|
|        Games|   53824.813477476615|
|        Music|    7219.049037242629|
|       Design|   61228.639399735264|
|  Photography|   10041.772509045762|
|   Technology|    90494.60017583462|
|   Journalism|     9695.56630185046|
|      Theater|    5935.421342126347|
|        Dance|    5422.770332885511|
+-------------+---------------------+



In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Combine features into a single vector column
featureCols = ["goal", "pledged", "usd pledged", "usd_pledged_real", "usd_goal_real"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features", handleInvalid="skip")
output = assembler.transform(df.dropna())

# Split data into train and test sets
trainData, testData = output.randomSplit([0.7, 0.3], seed=1234)

# Initialize models
lr = LogisticRegression(labelCol="label", featuresCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
gbt = GBTClassifier(labelCol="label", featuresCol="features")

# Fit models
lrModel = lr.fit(trainData)
rfModel = rf.fit(trainData)
gbtModel = gbt.fit(trainData)

# Make predictions on the test data for all models
lr_predictions = lrModel.transform(testData)
rf_predictions = rfModel.transform(testData)
gbt_predictions = gbtModel.transform(testData)

# Evaluate models and print accuracies
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

lr_accuracy = evaluator.evaluate(lr_predictions)
rf_accuracy = evaluator.evaluate(rf_predictions)
gbt_accuracy = evaluator.evaluate(gbt_predictions)

print(f"Logistic Regression Accuracy: {lr_accuracy}")
print(f"Random Forest Accuracy: {rf_accuracy}")
print(f"Gradient Boosted Tree Accuracy: {gbt_accuracy}")


Logistic Regression Accuracy: 0.9805528094148237
Random Forest Accuracy: 0.9770181877276851
Gradient Boosted Tree Accuracy: 0.9858208854159614
