In [None]:
!pip install pyspark

In [None]:
!python -m pip install pyspark


In [None]:
#Logistic regression using pyspark

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Logis').getOrCreate()

In [3]:
df=spark.read.csv('/content/Titanic-Dataset.csv',inferSchema=True,header=True)

df.head()

Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin=None, Embarked='S')

In [4]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [5]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [8]:
rm_columns=df.select(['Survived','Pclass','Sex','Age','SibSp','Parch','Fare','Embarked'])

result=rm_columns.na.drop()

result.show()

+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|
|       1|     1|female|38.0|    1|    0|71.2833|       C|
|       1|     3|female|26.0|    0|    0|  7.925|       S|
|       1|     1|female|35.0|    1|    0|   53.1|       S|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|
|       1|     3|female|27.0|    0|    2|11.1333|       S|
|       1|     2|female|14.0|    1|    0|30.0708|       C|
|       1|     3|female| 4.0|    1|    1|   16.7|       S|
|       1|     1|female|58.0|    0|    0|  26.55|       S|
|       0|     3|  male|20.0|    0|    0|   8.05|       S|
|       0|     3|  male|39.0|    1|    5| 31.275|       S|
|       0|     3|female|14.0|    0|    0| 7.8542|       

In [9]:
# Importing the required libraries
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder

# Converting the Sex Column
sexIdx = StringIndexer(inputCol='Sex',
                               outputCol='SexIndex')
sexEncode = OneHotEncoder(inputCol='SexIndex',
                               outputCol='SexVec')

# Converting the Embarked Column
embarkIdx = StringIndexer(inputCol='Embarked',
                               outputCol='EmbarkIndex')
embarkEncode = OneHotEncoder(inputCol='EmbarkIndex',
                               outputCol='EmbarkVec')

# Vectorizing the data into a new column "features"
# which will be our input/features class
assembler = VectorAssembler(inputCols=['Pclass',
                                       'SexVec','Age',
                                       'SibSp','Parch',
                                       'Fare','EmbarkVec'],
                                    outputCol='features')

In [10]:
# Importing Pipeline and Model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

log_reg = LogisticRegression(featuresCol='features',
                             labelCol='Survived')

# Creating the pipeline
pipe = Pipeline(stages=[sexIdx, embarkIdx,
                            sexEncode, embarkEncode,
                            assembler, log_reg])

In [12]:
# Splitting the data into train and test
train_data, test_data = result.randomSplit([0.7, .3])

# Fitting the model on training data
fit_model = pipe.fit(train_data)

# Storing the results on test data
results = fit_model.transform(test_data)

# Showing the results
results.show()

+--------+------+------+----+-----+-----+--------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|    Fare|Embarked|SexIndex|EmbarkIndex|       SexVec|    EmbarkVec|            features|       rawPrediction|         probability|prediction|
+--------+------+------+----+-----+-----+--------+--------+--------+-----------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|       0|     1|  male|24.0|    0|    1|247.5208|       C|     0.0|        1.0|(1,[0],[1.0])|(2,[1],[1.0])|[1.0,1.0,24.0,0.0...|[-1.0891880426766...|[0.25177120608452...|       1.0|
|       0|     1|  male|29.0|    0|    0|    30.0|       S|     0.0|        0.0|(1,[0],[1.0])|(2,[0],[1.0])|[1.0,1.0,29.0,0.0...|[-0.2007235881568...|[0.44998690909268...|       1.0|
|       0|     1|  male|29.0|    1|    0|    66.6|       S|     0.0|        0.0|(1,[0

In [None]:
#Pipeline

In [15]:
import pyspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder , VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [14]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [16]:
# create a sample dataframe with 4 features and 1 label column
sample_data_train = spark.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

# view the data
sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



In [17]:
# Importing the required libraries
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

# Convert feature_2
feature2Idx = StringIndexer(inputCol='feature_2', outputCol='feature2Idx')
feature2Enc = OneHotEncoder(inputCol='feature2Idx', outputCol='feature2Vec')

# Convert feature_3
feature3Idx = StringIndexer(inputCol='feature_3', outputCol='feature3Idx')
feature3Enc = OneHotEncoder(inputCol='feature3Idx', outputCol='feature3Vec')

# Assemble all features
assembler = VectorAssembler(inputCols=['feature_1', 'feature2Vec', 'feature3Vec', 'feature_4'],
                            outputCol='features')

# Logistic Regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Build pipeline
pipeline = Pipeline(stages=[feature2Idx, feature2Enc,
                            feature3Idx, feature3Enc,
                            assembler, lr])

# Fit and transform
model = pipeline.fit(sample_data_train)
result = model.transform(sample_data_train)

# View results
result.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.225955524188...|[1.21497707416456...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-18.358243476371...|[1.06442767136925...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.3563077812100...|[0.99999998933509...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[27.4123701891423...|[0.99999999999875...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-35.975077024041...|[2.37805865550653...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.2316263544839...|[0.99999998791893...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-19.243972103991...|[4.38984416425464...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+

