In [None]:
import pyspark
from pyspark import SparkContext
import os
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType
from pyspark.sql.functions import split, count, when, isnan, col, regexp_replace, max
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.linalg import Vectors



In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [None]:
sc = SparkContext.getOrCreate()
print(sc)

<SparkContext master=local[*] appName=pyspark-shell>


## Difference b/w RDD and spark dataset:

Rdd has no schema but shows the data as a strings in list but i spark dataset the schema is maintained

In [None]:
file_path='/content/drive/MyDrive/data/test.csv'

In [None]:
rdd= sc.textFile(file_path)
rdd.collect()

['PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,Q',
 '893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47,1,0,363272,7,,S',
 '894,2,"Myles, Mr. Thomas Francis",male,62,0,0,240276,9.6875,,Q',
 '895,3,"Wirz, Mr. Albert",male,27,0,0,315154,8.6625,,S',
 '896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22,1,1,3101298,12.2875,,S',
 '897,3,"Svensson, Mr. Johan Cervin",male,14,0,0,7538,9.225,,S',
 '898,3,"Connolly, Miss. Kate",female,30,0,0,330972,7.6292,,Q',
 '899,2,"Caldwell, Mr. Albert Francis",male,26,1,1,248738,29,,S',
 '900,3,"Abrahim, Mrs. Joseph (Sophie Halaut Easu)",female,18,0,0,2657,7.2292,,C',
 '901,3,"Davies, Mr. John Samuel",male,21,2,0,A/4 48871,24.15,,S',
 '902,3,"Ilieff, Mr. Ylio",male,,0,0,349220,7.8958,,S',
 '903,1,"Jones, Mr. Charles Cresson",male,46,0,0,694,26,,S',
 '904,1,"Snyder, Mrs. John Pillsbury (Nelle Stevenson)",female,23,1,0,21228,82.2667,B45,S',
 '905,2,"Howard, Mr. Benj

In [None]:
rdd.flatMap(lambda x:x.split(',')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()

[('PassengerId', 1),
 ('Pclass', 1),
 ('SibSp', 1),
 ('Parch', 1),
 ('Fare', 1),
 ('Cabin', 1),
 ('3', 226),
 ('male', 266),
 ('0', 609),
 ('', 414),
 ('Q', 46),
 ('"Wilkes', 1),
 ('female', 152),
 ('7', 3),
 ('S', 270),
 ('"Myles', 1),
 (' Mr. Thomas Francis"', 2),
 ('62', 1),
 ('240276', 1),
 (' Mr. Albert"', 1),
 ('896', 1),
 (' Mrs. Alexander (Helga E Lindqvist)"', 1),
 ('22', 16),
 ('3101298', 1),
 ('12.2875', 1),
 ('"Svensson', 1),
 ('14', 2),
 ('9.225', 1),
 ('"Connolly', 1),
 ('330972', 1),
 ('"Caldwell', 1),
 ('26', 31),
 ('29', 11),
 ('900', 1),
 ('18', 14),
 ('2657', 1),
 ('7.2292', 9),
 ('901', 1),
 (' Mr. John Samuel"', 1),
 ('21', 25),
 ('24.15', 1),
 ('"Ilieff', 1),
 ('"Jones', 1),
 ('694', 1),
 ('82.2667', 2),
 ('B45', 2),
 ('905', 1),
 (' Mr. Benjamin"', 1),
 ('906', 1),
 ('"Chaffee', 1),
 ('W.E.P. 5734', 1),
 ('61.175', 1),
 ('907', 1),
 ('"del Carlo', 1),
 (' Mrs. Sebastiano (Argenia Genovesi)"', 1),
 ('24', 17),
 ('SC/PARIS 2167', 1),
 ('27.7208', 6),
 ('908', 1),
 

In [None]:
import csv
from io import StringIO

In [None]:
def parse_csv(line):
  return list(csv.reader(StringIO(line)))[0]

#remove the header
header = rdd.first()
data = rdd.filter(lambda row: row != header)

#parse the csv correctly
parsed_data = data.map(parse_csv)

#Flatten all words and count occurence
word_count = parsed_data.flatMap(lambda row: row).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

word_count.collect()

[('3', 226),
 ('Kelly, Mr. James', 1),
 ('male', 266),
 ('0', 609),
 ('', 414),
 ('Q', 46),
 ('Wilkes, Mrs. James (Ellen Needs)', 1),
 ('female', 152),
 ('7', 3),
 ('S', 270),
 ('Myles, Mr. Thomas Francis', 1),
 ('62', 1),
 ('240276', 1),
 ('Wirz, Mr. Albert', 1),
 ('896', 1),
 ('Hirvonen, Mrs. Alexander (Helga E Lindqvist)', 1),
 ('22', 16),
 ('3101298', 1),
 ('12.2875', 1),
 ('14', 2),
 ('9.225', 1),
 ('330972', 1),
 ('Caldwell, Mr. Albert Francis', 1),
 ('26', 31),
 ('29', 11),
 ('900', 1),
 ('18', 14),
 ('2657', 1),
 ('7.2292', 9),
 ('901', 1),
 ('Davies, Mr. John Samuel', 1),
 ('21', 25),
 ('24.15', 1),
 ('Ilieff, Mr. Ylio', 1),
 ('Jones, Mr. Charles Cresson', 1),
 ('694', 1),
 ('Snyder, Mrs. John Pillsbury (Nelle Stevenson)', 1),
 ('82.2667', 2),
 ('B45', 2),
 ('905', 1),
 ('906', 1),
 ('W.E.P. 5734', 1),
 ('61.175', 1),
 ('907', 1),
 ('24', 17),
 ('SC/PARIS 2167', 1),
 ('27.7208', 6),
 ('908', 1),
 ('Keane, Mr. Daniel', 1),
 ('233734', 1),
 ('7.225', 9),
 ('910', 1),
 ('Ilmakang

In [None]:
parsed_data.flatMap(lambda row: row).map(lambda word: (word, 1)).collect()

[('892', 1),
 ('3', 1),
 ('Kelly, Mr. James', 1),
 ('male', 1),
 ('34.5', 1),
 ('0', 1),
 ('0', 1),
 ('330911', 1),
 ('7.8292', 1),
 ('', 1),
 ('Q', 1),
 ('893', 1),
 ('3', 1),
 ('Wilkes, Mrs. James (Ellen Needs)', 1),
 ('female', 1),
 ('47', 1),
 ('1', 1),
 ('0', 1),
 ('363272', 1),
 ('7', 1),
 ('', 1),
 ('S', 1),
 ('894', 1),
 ('2', 1),
 ('Myles, Mr. Thomas Francis', 1),
 ('male', 1),
 ('62', 1),
 ('0', 1),
 ('0', 1),
 ('240276', 1),
 ('9.6875', 1),
 ('', 1),
 ('Q', 1),
 ('895', 1),
 ('3', 1),
 ('Wirz, Mr. Albert', 1),
 ('male', 1),
 ('27', 1),
 ('0', 1),
 ('0', 1),
 ('315154', 1),
 ('8.6625', 1),
 ('', 1),
 ('S', 1),
 ('896', 1),
 ('3', 1),
 ('Hirvonen, Mrs. Alexander (Helga E Lindqvist)', 1),
 ('female', 1),
 ('22', 1),
 ('1', 1),
 ('1', 1),
 ('3101298', 1),
 ('12.2875', 1),
 ('', 1),
 ('S', 1),
 ('897', 1),
 ('3', 1),
 ('Svensson, Mr. Johan Cervin', 1),
 ('male', 1),
 ('14', 1),
 ('0', 1),
 ('0', 1),
 ('7538', 1),
 ('9.225', 1),
 ('', 1),
 ('S', 1),
 ('898', 1),
 ('3', 1),
 ('Conn

In [None]:
file_path1='/content/drive/MyDrive/data/train.csv'
rdd_train= sc.textFile(file_path1)
rdd_train.collect()

['PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S',
 '2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C',
 '3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S',
 '4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S',
 '5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S',
 '6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q',
 '7,0,1,"McCarthy, Mr. Timothy J",male,54,0,0,17463,51.8625,E46,S',
 '8,0,3,"Palsson, Master. Gosta Leonard",male,2,3,1,349909,21.075,,S',
 '9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27,0,2,347742,11.1333,,S',
 '10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14,1,0,237736,30.0708,,C',
 '11,1,3,"Sandstrom, Miss. Marguerite Rut",female,4,1,1,PP 9549,16.7,G6,S',
 '12,1,1,"Bonnell, Miss. Elizabeth",female,58,0,0,113783,26.55,C103,S',
 '

In [None]:
header_train = rdd_train.first()
data_train = rdd_train.filter(lambda row: row != header_train)

#parse the csv correctly
parsed_data_train = data_train.map(parse_csv)

#Flatten all words and count occurence
word_count_train = parsed_data_train.flatMap(lambda row: row).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

word_count_train.collect()

[('0', 1850),
 ('3', 519),
 ('male', 577),
 ('22', 28),
 ('A/5 21171', 1),
 ('', 866),
 ('S', 644),
 ('female', 314),
 ('38', 12),
 ('71.2833', 1),
 ('26', 50),
 ('4', 33),
 ('Futrelle, Mrs. Jacques Heath (Lily May Peel)', 1),
 ('113803', 2),
 ('53.1', 5),
 ('C123', 2),
 ('5', 16),
 ('Allen, Mr. William Henry', 1),
 ('373450', 1),
 ('8.05', 43),
 ('6', 5),
 ('Moran, Mr. James', 1),
 ('330877', 1),
 ('8.4583', 1),
 ('Q', 77),
 ('7', 4),
 ('McCarthy, Mr. Timothy J', 1),
 ('17463', 1),
 ('E46', 1),
 ('349909', 4),
 ('21.075', 4),
 ('Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)', 1),
 ('347742', 3),
 ('10', 3),
 ('Nasser, Mrs. Nicholas (Adele Achem)', 1),
 ('14', 8),
 ('30.0708', 2),
 ('PP 9549', 2),
 ('G6', 4),
 ('58', 6),
 ('113783', 1),
 ('26.55', 15),
 ('C103', 1),
 ('13', 45),
 ('A/5. 2151', 1),
 ('347082', 7),
 ('15', 7),
 ('350406', 1),
 ('7.8542', 13),
 ('16', 19),
 ('Hewlett, Mrs. (Mary D Kingcome) ', 1),
 ('55', 5),
 ('248706', 1),
 ('17', 14),
 ('382652', 5),
 ('18', 30),
 

### Q1. HOw many passengers data do we have?[from test dataset]

In [None]:
parsed_data.filter(lambda x:x[2]).count()

418

### Q2. How many male and female passengers survived?

In [None]:
count = parsed_data.map(lambda x:(x[3],1)).reduceByKey(lambda x,y:x+y).collect()
print(f"Number of male passengers: {count}")

Number of male passengers: [('male', 266), ('female', 152)]


* This is the result of test data set where there is no col for survived

In [None]:
parsed_data_train.map(lambda x:(x[4],1) if (float(x[1]) ==1) else (x[4],0)).reduceByKey(lambda x,y:x+y).collect()

[('male', 109), ('female', 233)]

* This is the result of train data set where there is col for survived

## Q3. Max Fare

In [None]:
max_value = parsed_data.map(lambda x: float(x[8]) if x[8] else 0.0).max()
print(f"Maximum value: {max_value}")



Maximum value: 512.3292


In [None]:
#parsed_data.map(lambda x:x[8]).max()

## Q4. Display the number of passenger in each class.

In [None]:
parsed_data.map(lambda x:(x[1],1)).reduceByKey(lambda x,y:x+y).collect()

[('3', 218), ('2', 93), ('1', 107)]

## Q5. Find out the passenger who bought the most expensive ticket

In [None]:
imp_train = parsed_data_train.map(lambda x: (x[3],float(x[9]) if x[9] else 0.0)).max(key=lambda x:x[1])
print(imp_train)
parsed_data_train.filter(lambda x:x[9]==str(imp_train[1])).map(lambda x:(x[3],x[9])).collect()

('Ward, Miss. Anna', 512.3292)


[('Ward, Miss. Anna', '512.3292'),
 ('Cardeza, Mr. Thomas Drake Martinez', '512.3292'),
 ('Lesurer, Mr. Gustave J', '512.3292')]

In [None]:

imp = parsed_data.map(lambda x: (x[2],float(x[8]) if x[8] else 0.0)).max(key=lambda x:x[1])
print(imp)

('Cardeza, Mrs. James Warburton Martinez (Charlotte Wardle Drake)', 512.3292)


## Q6 Use passenger name and display the details of that passenger



In [None]:
parsed_data.filter(lambda x:x[2]==imp[0]).collect()

[['1235',
  '1',
  'Cardeza, Mrs. James Warburton Martinez (Charlotte Wardle Drake)',
  'female',
  '58',
  '0',
  '1',
  'PC 17755',
  '512.3292',
  'B51 B53 B55',
  'C']]

## Q7. Find the average age of passenger

In [None]:
parsed_data.map(lambda x: float(x[4]) if x[4] else 0.0).mean()

24.044258373205736

## Q8. Count the number of passengers embarked from each port

In [None]:
parsed_data.filter(lambda x: x[10] and x[10].strip() !='').map(lambda x:(x[10],1)).reduceByKey(lambda x,y:x+y).collect()
# ----------filtering missing values ----------------------then mapping

[('Q', 46), ('S', 270), ('C', 102)]

## Q9. Sort the data with respect to their age

In [None]:
parsed_data.filter(lambda x : (x[4] and x[4].strip() !='')).sortBy(lambda x: float(x[4])).collect()

[['1246',
  '3',
  'Dean, Miss. Elizabeth Gladys Millvina""',
  'female',
  '0.17',
  '1',
  '2',
  'C.A. 2315',
  '20.575',
  '',
  'S'],
 ['1093',
  '3',
  'Danbom, Master. Gilbert Sigvard Emanuel',
  'male',
  '0.33',
  '0',
  '2',
  '347080',
  '14.4',
  '',
  'S'],
 ['1173',
  '3',
  'Peacock, Master. Alfred Edward',
  'male',
  '0.75',
  '1',
  '1',
  'SOTON/O.Q. 3101315',
  '13.775',
  '',
  'S'],
 ['1199',
  '3',
  'Aks, Master. Philip Frank',
  'male',
  '0.83',
  '0',
  '1',
  '392091',
  '9.35',
  '',
  'S'],
 ['1142',
  '2',
  'West, Miss. Barbara J',
  'female',
  '0.92',
  '1',
  '2',
  'C.A. 34651',
  '27.75',
  '',
  'S'],
 ['1009',
  '3',
  'Sandstrom, Miss. Beatrice Irene',
  'female',
  '1',
  '1',
  '1',
  'PP 9549',
  '16.7',
  'G6',
  'S'],
 ['1155',
  '3',
  'Klasen, Miss. Gertrud Emilia',
  'female',
  '1',
  '1',
  '1',
  '350405',
  '12.1833',
  '',
  'S'],
 ['1188',
  '2',
  'Laroche, Miss. Louise',
  'female',
  '1',
  '1',
  '2',
  'SC/Paris 2123',
  '41.57

## Join

In [None]:
c= ((201,'pune'),(301,'kol'),(201,'Mum'),(402,'jaipur'),(505,'HTM'))
d =sc.parallelize(c)

b= ((201,'navin'),(301,'kumar'),(402,'pal'),(603,'kk'))
a = sc.parallelize(b)
tupple_join = d.join(a)
tupple_join.collect()

[(201, ('pune', 'navin')),
 (201, ('Mum', 'navin')),
 (301, ('kol', 'kumar')),
 (402, ('jaipur', 'pal'))]