In [1]:
import os

In [2]:
!pwd

/content


In [3]:
!apt install openjdk-8-jdk-headless -qq

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 24 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 121666 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u392-ga-1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u392-ga-1~22.04) ...
Selec

In [4]:
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [5]:
!echo $JAVA_HOME

/usr/lib/jvm/java-8-openjdk-amd64


In [6]:
!pip install pyspark==3.5.0

Collecting pyspark==3.5.0
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=15d693d790e55f010d14ba574eda58c0897b6ae4bc00e6dd07de032852c7d32e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [16]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.functions import col, sum, when, isnan, lit
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *

In [9]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import psutil
import time

In [41]:
spark = SparkSession.builder.master("local[4]").appName('ml-app-4').config('spark.ui.port', '4050').getOrCreate()

In [42]:
df = spark.read.csv('/content/train_full.csv',inferSchema=True,header=True)
df_test = spark.read.csv('/content/test_full.csv',inferSchema=True,header=True)

In [43]:
df = df.union(df_test)

In [44]:
print((df.count(),len(df.columns)))

(11306, 19)


In [45]:
def preprocess(data):
  start_time = time.time()

  data = data.withColumn("index", monotonically_increasing_id())
  data = data.drop(*['PassengerId', 'Name'])

  bool_data = data.select([col(c).cast("integer") for c in ["CryoSleep", "VIP", "Transported"]])
  bool_data = bool_data.withColumn("index", monotonically_increasing_id())

  bool_data = bool_data.withColumnRenamed("CryoSleep", "CryoSleep_value")
  bool_data = bool_data.withColumnRenamed("VIP", "VIP_value")
  bool_data = bool_data.withColumnRenamed("Transported", "Transported_value")

  combined_data = data.join(bool_data, on="index", how="inner")

  SI_HomePlanet = StringIndexer(inputCol='HomePlanet',outputCol='HomePlanet_Index')
  SI_Destination = StringIndexer(inputCol='Destination',outputCol='Destination_Index')
  SI_Lastname = StringIndexer(inputCol='Lastname',outputCol='Lastname_Index')
  SI_Deck = StringIndexer(inputCol='Deck',outputCol='Deck_Index')
  SI_Deck_side = StringIndexer(inputCol='Deck_side',outputCol='DeckSide_Index')
  ST_VRDeck = StringIndexer(inputCol="VRDeck", outputCol="VRDeck_index")
  ST_ShoppingMall = StringIndexer(inputCol="ShoppingMall", outputCol="ShoppingMall_index")
  ST_RoomService = StringIndexer(inputCol="RoomService", outputCol="RoomService_index")
  ST_Age = StringIndexer(inputCol="Age", outputCol="Age_index")
  ST_FoodCourt = StringIndexer(inputCol="FoodCourt", outputCol="FoodCourt_index")
  ST_Spa = StringIndexer(inputCol="Spa", outputCol="Spa_index")

  st = [SI_HomePlanet, SI_Destination, SI_Lastname, SI_Deck, SI_Deck_side, ST_VRDeck, ST_ShoppingMall, ST_RoomService, ST_Age, ST_FoodCourt, ST_Spa]

  for indexer in st:
    combined_data = indexer.fit(combined_data).transform(combined_data)

  pandas_df = combined_data.toPandas()

  combined_data = combined_data.withColumn(
    "RoomService_index",
    when(combined_data.RoomService_index > 9000, 9000).otherwise(combined_data.RoomService_index)
  )
  combined_data = combined_data.withColumn(
      "FoodCourt_index",
      when(combined_data.FoodCourt_index > 22000, 22000).otherwise(combined_data.RoomService_index)
  )
  combined_data = combined_data.withColumn(
      "ShoppingMall_index",
      when(combined_data.ShoppingMall_index > 11000, 11000).otherwise(combined_data.RoomService_index)
  )
  combined_data = combined_data.withColumn(
      "Spa_index",
      when(combined_data.Spa_index > 17000, 17000).otherwise(combined_data.RoomService_index)
  )
  combined_data = combined_data.withColumn(
      "VRDeck_index",
      when(combined_data.VRDeck_index > 21000, 21000).otherwise(combined_data.RoomService_index)
  )

  services = ['RoomService', 'FoodCourt', 'ShoppingMall', 'Spa', 'VRDeck']
  pandas_df['Total_expenses'] = pandas_df[services].sum(axis=1)
  Group_members = pandas_df.Group.value_counts().to_dict()
  pandas_df['Group_members'] = pandas_df.Group.map(Group_members)
  Cabin_members = pandas_df.Cabin.value_counts().to_dict()
  pandas_df['Cabin_members'] = pandas_df.Cabin.map(Cabin_members)

  pandas_df.Cabin_members.fillna(pandas_df.Cabin_members.mean(), inplace=True)
  col_drop = ['Cabin', 'Lastname']
  pandas_df = pandas_df.drop(col_drop, axis=1)

  spark_df = spark.createDataFrame(pandas_df)

  OHE = OneHotEncoder(inputCols=['Age_index', 'RoomService_index','FoodCourt_index','ShoppingMall_index','Spa_index','VRDeck_index','Group', "Cab_num", "HomePlanet_Index", "Destination_Index" , "Lastname_Index", "Deck_Index", "DeckSide_Index", "CryoSleep_value", "VIP_value"],
                    outputCols=['Age_index_OHE', 'RoomService_index_OHE','FoodCourt_index_OHE','ShoppingMall_index_OHE','Spa_index_OHE','VRDeck_index_OHE','Group_OHE', "Cab_num_OHE", "HomePlanet_Index_OHE", "Destination_Index_OHE" , "Lastname_Index_OHE", "Deck_Index_OHE",
                                "DeckSide_Index_OHE", "CryoSleep_value_OHE", "VIP_value_OHE"])

  combined_OHE_data = OHE.fit(spark_df).transform(spark_df)

  assembler = VectorAssembler(inputCols=[
 'Age',
 'RoomService',
 'FoodCourt',
 'ShoppingMall',
 'Spa',
 'VRDeck',
 'Group',
 'Cab_num',
 'HomePlanet_Index',
 'Destination_Index',
 'Lastname_Index',
 'Deck_Index',
 'DeckSide_Index',
 'CryoSleep_value',
 'VIP_value',
 'HomePlanet_Index_OHE',
 'Destination_Index_OHE',
 'Lastname_Index_OHE',
 'Deck_Index_OHE',
 'DeckSide_Index_OHE',
 'CryoSleep_value_OHE',
 'VIP_value_OHE'],
                           outputCol='features')

  data = combined_OHE_data.fillna(0)
  data = assembler.transform(data)
  model_df = data.select(["features", "Transported_value"])
  model_df = model_df.withColumnRenamed("Transported_value","label")

  end_time = time.time()
  elapsed_time = end_time - start_time
  cpu_percent = psutil.cpu_percent()
  virtual_memory_percent = psutil.virtual_memory().percent
  return [model_df, {
        'elapsed_time': elapsed_time,
        'cpu_percent': cpu_percent,
        'virtual_memory_percent': virtual_memory_percent
    }]

In [46]:
r = preprocess(df)

In [47]:
preprocessed_data = r[0]

In [48]:
preprocessed_data.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(2407,[0,6,8,10,1...|    0|
|(2407,[0,1,2,3,4,...|    1|
|(2407,[0,1,2,4,5,...|    0|
|(2407,[0,2,3,4,5,...|    0|
|(2407,[0,1,2,3,4,...|    1|
+--------------------+-----+
only showing top 5 rows



In [49]:
preprocessing_per = r[1]

In [50]:
preprocessing_per

{'elapsed_time': 12.976667642593384,
 'cpu_percent': 56.0,
 'virtual_memory_percent': 18.3}

In [51]:
def train(data):
  start_time = time.time()

  training_df,test_df = data.randomSplit([0.75,0.25])
  log_reg = LogisticRegression(featuresCol="features", labelCol="label")

  log_reg_model = log_reg.fit(training_df)
  lr_summary=log_reg_model.summary

  end_time = time.time()
  elapsed_time = end_time - start_time
  cpu_percent = psutil.cpu_percent()
  virtual_memory_percent = psutil.virtual_memory().percent

  return{
        'accuracy' : lr_summary.accuracy,
        'ROC' : lr_summary.areaUnderROC,
        'elapsed_time': elapsed_time,
        'cpu_percent': cpu_percent,
        'virtual_memory_percent': virtual_memory_percent
    }

In [52]:
t = train(preprocessed_data)

In [53]:
t

{'accuracy': 0.8346512451315945,
 'ROC': 0.9165465142291688,
 'elapsed_time': 7.704815149307251,
 'cpu_percent': 33.0,
 'virtual_memory_percent': 18.7}