In [6]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In Apache Spark, SparkSession is the entry point for programming Spark with the Dataset and DataFrame API. 

The SparkSession is a unified entry point for users to interact with Spark data, and it replaces the earlier SQLContext and HiveContext in Spark 2.x. It is the starting point for any Spark functionality and enables the user to create a Spark DataFrame and Dataset and execute SQL queries against them.

In [7]:
from pyspark.sql import SparkSession

In [8]:
spark=SparkSession.builder.appName('test').getOrCreate()

In [9]:
spark

the local[*] URL tells Spark to run in local mode, using all available cores on the machine where the application is running.

In [10]:
# Now you can use SparkSession to create DataFrames and execute SQL queries
df = spark.read.csv("diabetes.csv")
df.show()

+-----------+-------+-------------+-------------+-------+----+--------------------+---+-------+
|        _c0|    _c1|          _c2|          _c3|    _c4| _c5|                 _c6|_c7|    _c8|
+-----------+-------+-------------+-------------+-------+----+--------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeF...|Age|Outcome|
|          6|    148|           72|           35|      0|33.6|               0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|               0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|               0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|               0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|               2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|               0.201| 30|      0|
|          3|     78|           50|     

set header=True.

In [11]:
df = spark.read.csv("diabetes.csv",header=True)
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [12]:
type(df)

pyspark.sql.dataframe.DataFrame

If you don't set inferSchema=True when reading a CSV file in PySpark, the schema of the resulting DataFrame will be inferred as string types for all columns.

When reading a CSV file, PySpark infers the schema of the DataFrame by inspecting the contents of the file. If inferSchema is set to True, PySpark will automatically try to guess the data types of each column in the CSV file.

In [13]:
# Print the schema of the DataFrame
df.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



if nullable is set to True, then the column can contain null values. If nullable is set to False, then the column cannot contain null values.

In [14]:
# Read a CSV file and infer the schema
df = spark.read.csv("diabetes.csv", header=True, inferSchema=True)
# Show the contents of the DataFrame
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [15]:
# Print the schema of the DataFrame
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [16]:
df.dtypes

[('Pregnancies', 'int'),
 ('Glucose', 'int'),
 ('BloodPressure', 'int'),
 ('SkinThickness', 'int'),
 ('Insulin', 'int'),
 ('BMI', 'double'),
 ('DiabetesPedigreeFunction', 'double'),
 ('Age', 'int'),
 ('Outcome', 'int')]

In [17]:
l=df.dtypes
for column_name,column_type in l:
  print(f"Type of {column_name} : {column_type}")

Type of Pregnancies : int
Type of Glucose : int
Type of BloodPressure : int
Type of SkinThickness : int
Type of Insulin : int
Type of BMI : double
Type of DiabetesPedigreeFunction : double
Type of Age : int
Type of Outcome : int


In [18]:
df.schema

StructType([StructField('Pregnancies', IntegerType(), True), StructField('Glucose', IntegerType(), True), StructField('BloodPressure', IntegerType(), True), StructField('SkinThickness', IntegerType(), True), StructField('Insulin', IntegerType(), True), StructField('BMI', DoubleType(), True), StructField('DiabetesPedigreeFunction', DoubleType(), True), StructField('Age', IntegerType(), True), StructField('Outcome', IntegerType(), True)])

In [19]:
df.schema['Pregnancies']

StructField('Pregnancies', IntegerType(), True)

In [20]:
df.schema['Pregnancies'].dataType

IntegerType()

In [21]:
df.head()

Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1)

In [22]:
df.head(3)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1)]

In [23]:
df.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [24]:
df.select('Outcome')

DataFrame[Outcome: int]

In [25]:
df.select('Outcome').show()

+-------+
|Outcome|
+-------+
|      1|
|      0|
|      1|
|      0|
|      1|
|      0|
|      1|
|      0|
|      1|
|      1|
|      0|
|      1|
|      0|
|      1|
|      1|
|      1|
|      1|
|      1|
|      0|
|      1|
+-------+
only showing top 20 rows



In [26]:
df.select(['Pregnancies','Outcome'])

DataFrame[Pregnancies: int, Outcome: int]

In [27]:
df.select(['Pregnancies','Outcome']).show()

+-----------+-------+
|Pregnancies|Outcome|
+-----------+-------+
|          6|      1|
|          1|      0|
|          8|      1|
|          1|      0|
|          0|      1|
|          5|      0|
|          3|      1|
|         10|      0|
|          2|      1|
|          8|      1|
|          4|      0|
|         10|      1|
|         10|      0|
|          1|      1|
|          5|      1|
|          7|      1|
|          0|      1|
|          7|      1|
|          1|      0|
|          1|      1|
+-----------+-------+
only showing top 20 rows



In [28]:
df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

In [29]:
#add column
df=df.withColumn('new',df['Glucose']+1)

In [30]:
df.head(3)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, new=149),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0, new=86),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1, new=184)]

In [31]:
from pyspark.sql.functions import when
# Assuming the glucose column is called "glucose"
df = df.withColumn("glucose_less_than_140", when(df["glucose"] < 140, 1).otherwise(0))

In [32]:
df.head(4)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, new=149, glucose_less_than_140=0),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0, new=86, glucose_less_than_140=1),
 Row(Pregnancies=8, Glucose=183, BloodPressure=64, SkinThickness=0, Insulin=0, BMI=23.3, DiabetesPedigreeFunction=0.672, Age=32, Outcome=1, new=184, glucose_less_than_140=0),
 Row(Pregnancies=1, Glucose=89, BloodPressure=66, SkinThickness=23, Insulin=94, BMI=28.1, DiabetesPedigreeFunction=0.167, Age=21, Outcome=0, new=90, glucose_less_than_140=1)]

In [33]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+---------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|new|glucose_less_than_140|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+---------------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|149|                    0|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0| 86|                    1|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|184|                    0|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0| 90|                    1|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|138|               

In [34]:
#drop column
df=df.drop('glucose_less_than_140')

In [35]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|new|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+---+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|149|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0| 86|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|184|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0| 90|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|138|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|117|
|          3|     78|           50|           32|     88|31.0|  

In [36]:
#rename column
df.withColumnRenamed('new','new_name').show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|new_name|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|     149|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|      86|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|     184|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|      90|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|     138|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|     117|
|          3|     7

In [37]:
df.head()

Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, new=149)

In [38]:
df=df.withColumnRenamed('new','new_name')

In [39]:
df.head(2)

[Row(Pregnancies=6, Glucose=148, BloodPressure=72, SkinThickness=35, Insulin=0, BMI=33.6, DiabetesPedigreeFunction=0.627, Age=50, Outcome=1, new_name=149),
 Row(Pregnancies=1, Glucose=85, BloodPressure=66, SkinThickness=29, Insulin=0, BMI=26.6, DiabetesPedigreeFunction=0.351, Age=31, Outcome=0, new_name=86)]

In [40]:
#drop rows where there is a nan value
df=df.na.drop()

In [41]:
#how='any' (#drop rows where there is a nan value)
df=df.na.drop(how="any")

In [42]:
#how='all' (#drop rows where all records of that raw is nan)
df=df.na.drop(how="all")

In [43]:
#the method will remove rows that have n or more null or NaN values <=> thresh=n
df=df.na.drop(thresh=2)
#will remove raws where there are 2 nan values or more

In [44]:
#subset
df=df.na.drop(subset=['Outcome'])

This means that rows will only be dropped if they have a null or NaN value in the "Outcome" column

In [45]:
#fill missing values
#df=df.na.fill(value)=>will replace any nan value with the value specified
df=df.na.fill('missing value')

In [46]:
#df=df.na.fill(value,columns)=>will replace any nan value with the value specified just in the columns specified
df=df.na.fill('missing value','Outcome')
df=df.na.fill('missing value',['Outcome','Glucose'])

imputer.setStrategy("....")

So, setting strategy='mean' in the constructor is enough to specify the strategy for filling in missing values with the mean. However, if you need to modify the strategy after creating the Imputer object, you can use the setStrategy method to do so.

In [47]:
from pyspark.ml.feature import Imputer

# create a PySpark DataFrame with missing values
data = [(1, 2, None), (2, None, 4.0), (3, 4, 5.0)]
#data=[(record_1),......(record_n)]
df = spark.createDataFrame(data, ["a", "b", "c"])
print(df.printSchema())
df.show()
# create the Imputer object and fit it to the DataFrame
imputer = Imputer(strategy='mean', inputCols=df.columns, outputCols=df.columns)
imputer_model = imputer.fit(df)

# apply the imputer to the DataFrame
imputed_df = imputer_model.transform(df)

imputed_df.show()

root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- c: double (nullable = true)

None
+---+----+----+
|  a|   b|   c|
+---+----+----+
|  1|   2|null|
|  2|null| 4.0|
|  3|   4| 5.0|
+---+----+----+

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|4.5|
|  2|  3|4.0|
|  3|  4|5.0|
+---+---+---+



In [48]:
from pyspark.ml.feature import Imputer

# create a PySpark DataFrame with missing values
data = [(1, 2, None), (2, None, 4), (3, 4, 5)]
#data=[(record_1),......(record_n)]
df = spark.createDataFrame(data, ["a", "b", "c"])
print(df.printSchema())
df.show()
# create the Imputer object and fit it to the DataFrame
imputer = Imputer(strategy='mean', inputCols=df.columns, outputCols=df.columns)
imputer_model = imputer.fit(df)

# apply the imputer to the DataFrame
imputed_df = imputer_model.transform(df)

imputed_df.show()

root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)

None
+---+----+----+
|  a|   b|   c|
+---+----+----+
|  1|   2|null|
|  2|null|   4|
|  3|   4|   5|
+---+----+----+

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  4|
|  2|  3|  4|
|  3|  4|  5|
+---+---+---+



In [49]:
from pyspark.ml.feature import Imputer

# create a PySpark DataFrame with missing values
data = [(1, 2, None), (2, None, 4), (3, 4, 5)]
#data=[(record_1),......(record_n)]
df = spark.createDataFrame(data, ["a", "b", "c"])
print(df.printSchema())
df.show()
# create the Imputer object and fit it to the DataFrame
imputer = Imputer(strategy='mean', inputCols=df.columns, 
                  outputCols=["imputed_"+c for c in ["a", "b", "c"]])
imputer_model = imputer.fit(df)

# apply the imputer to the DataFrame
imputed_df = imputer_model.transform(df)

imputed_df.show()

root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)

None
+---+----+----+
|  a|   b|   c|
+---+----+----+
|  1|   2|null|
|  2|null|   4|
|  3|   4|   5|
+---+----+----+

+---+----+----+---------+---------+---------+
|  a|   b|   c|imputed_a|imputed_b|imputed_c|
+---+----+----+---------+---------+---------+
|  1|   2|null|        1|        2|        4|
|  2|null|   4|        2|        3|        4|
|  3|   4|   5|        3|        4|        5|
+---+----+----+---------+---------+---------+



In [50]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Dave", 40), ("Eve", 45)]
df = spark.createDataFrame(data, ["name", "age"])

# filter the rows where age is greater than or equal to 35
filtered_df = df.filter("age>= 35")

# show the filtered DataFrame
filtered_df.show()
print('*****************')
filtered_df.select('age').show()


+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
|   Dave| 40|
|    Eve| 45|
+-------+---+

*****************
+---+
|age|
+---+
| 35|
| 40|
| 45|
+---+



In [51]:
# filter the rows where age is greater than or equal to 35
filtered_df = df.filter(df["age"]>= 35)
# show the filtered DataFrame
filtered_df.show()
print('*****************')
filtered_df.select('age').show()

+-------+---+
|   name|age|
+-------+---+
|Charlie| 35|
|   Dave| 40|
|    Eve| 45|
+-------+---+

*****************
+---+
|age|
+---+
| 35|
| 40|
| 45|
+---+



In [52]:
# filter the rows where age is greater than or equal to 35
#filtered_df = df.filter((df["age"]>= 35)&(df['col2']....))

In [54]:
# filter the rows where age is less than to 30
filtered_df = df.filter(~(df["age"]>= 30))
# show the filtered DataFrame
filtered_df.show()
print('*****************')
filtered_df.select('age').show()

+-----+---+
| name|age|
+-----+---+
|Alice| 25|
+-----+---+

*****************
+---+
|age|
+---+
| 25|
+---+



In [60]:
df=spark.read.csv('diabetes.csv',header=True,inferSchema=True)
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [62]:
#Groupby
df.groupBy('Outcome').sum().show()

+-------+----------------+------------+------------------+------------------+------------+-----------------+-----------------------------+--------+------------+
|Outcome|sum(Pregnancies)|sum(Glucose)|sum(BloodPressure)|sum(SkinThickness)|sum(Insulin)|         sum(BMI)|sum(DiabetesPedigreeFunction)|sum(Age)|sum(Outcome)|
+-------+----------------+------------+------------------+------------------+------------+-----------------+-----------------------------+--------+------------+
|      1|            1304|       37857|             18981|              5940|       26890|9418.199999999986|                      147.534|    9934|         268|
|      0|            1649|       54990|             34092|              9832|       34396|15152.09999999998|           214.86700000000008|   15595|           0|
+-------+----------------+------------+------------------+------------------+------------+-----------------+-----------------------------+--------+------------+



In [63]:
df.groupBy('Outcome').sum('Age').show()

+-------+--------+
|Outcome|sum(Age)|
+-------+--------+
|      1|    9934|
|      0|   15595|
+-------+--------+



In [64]:
df.groupBy('Outcome').mean().show()

+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------------------+-----------------+------------+
|Outcome| avg(Pregnancies)|      avg(Glucose)|avg(BloodPressure)|avg(SkinThickness)|      avg(Insulin)|         avg(BMI)|avg(DiabetesPedigreeFunction)|         avg(Age)|avg(Outcome)|
+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+-----------------------------+-----------------+------------+
|      1|4.865671641791045|141.25746268656715| 70.82462686567165| 22.16417910447761|100.33582089552239|35.14253731343278|                       0.5505|37.06716417910448|         1.0|
|      0|            3.298|            109.98|            68.184|            19.664|            68.792|30.30419999999996|          0.42973400000000017|            31.19|         0.0|
+-------+-----------------+------------------+------------------+------------------+-

In [65]:
df.groupBy('Outcome').mean('Age').show()

+-------+-----------------+
|Outcome|         avg(Age)|
+-------+-----------------+
|      1|37.06716417910448|
|      0|            31.19|
+-------+-----------------+



In [66]:
df.groupBy('Outcome').count().show()
#imbalanced data

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [67]:
df.agg({'Age':'mean'}).show()

+------------------+
|          avg(Age)|
+------------------+
|33.240885416666664|
+------------------+



In [69]:
df.groupBy('Outcome').max().show()

+-------+----------------+------------+------------------+------------------+------------+--------+-----------------------------+--------+------------+
|Outcome|max(Pregnancies)|max(Glucose)|max(BloodPressure)|max(SkinThickness)|max(Insulin)|max(BMI)|max(DiabetesPedigreeFunction)|max(Age)|max(Outcome)|
+-------+----------------+------------+------------------+------------------+------------+--------+-----------------------------+--------+------------+
|      1|              17|         199|               114|                99|         846|    67.1|                         2.42|      70|           1|
|      0|              13|         197|               122|                60|         744|    57.3|                        2.329|      81|           0|
+-------+----------------+------------+------------------+------------------+------------+--------+-----------------------------+--------+------------+



In [68]:
df.groupBy('Outcome').max('Age').show()

+-------+--------+
|Outcome|max(Age)|
+-------+--------+
|      1|      70|
|      0|      81|
+-------+--------+



In [70]:
df.groupBy('Outcome').min('Age').show()

+-------+--------+
|Outcome|min(Age)|
+-------+--------+
|      1|      21|
|      0|      21|
+-------+--------+



In [80]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

# Create a list of tuples representing the data
data = [(25, 3, 50000),
        (30, 5, 75000),
        (35, 8, 100000),
        (40, 10, 125000)]

# Create a DataFrame with the data
df = spark.createDataFrame(data, ["age", "experience", "salary"])

# Display the DataFrame
df.show()

+---+----------+------+
|age|experience|salary|
+---+----------+------+
| 25|         3| 50000|
| 30|         5| 75000|
| 35|         8|100000|
| 40|        10|125000|
+---+----------+------+



In [73]:
df.select('age').show()

+---+
|age|
+---+
| 25|
| 30|
| 35|
| 40|
+---+



In [76]:
df.columns

['features', 'salary']

In [81]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Select relevant columns and create a feature vector
assembler = VectorAssembler(inputCols=["age", "experience"], outputCol="features")
df = assembler.transform(df).select("features", "salary")
df.show()

+-----------+------+
|   features|salary|
+-----------+------+
| [25.0,3.0]| 50000|
| [30.0,5.0]| 75000|
| [35.0,8.0]|100000|
|[40.0,10.0]|125000|
+-----------+------+



In [None]:
# Split data into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)
#80% for training data

In [82]:
# Create a LinearRegression object and fit to the training data
lr = LinearRegression(featuresCol="features", labelCol="salary")
model = lr.fit(train_data)

In [85]:
# Get the slope and intercept
slope = model.coefficients[0]
intercept = model.intercept

# Print the slope and intercept
print("Slope:", slope)
print("Intercept:", intercept)

Slope: 4999.999999997748
Intercept: -74999.99999995799


In [99]:
# Make predictions on the test data
predictions = model.transform(test_data)
predictions.show()

+----------+------+-----------------+
|  features|salary|       prediction|
+----------+------+-----------------+
|[30.0,5.0]| 75000|74999.99999999806|
+----------+------+-----------------+



In [102]:
# Make predictions on the test data
p = model.evaluate(test_data)
p.predictions.show()

+----------+------+-----------------+
|  features|salary|       prediction|
+----------+------+-----------------+
|[30.0,5.0]| 75000|74999.99999999806|
+----------+------+-----------------+



In [103]:
p.meanAbsoluteError,p.meanSquaredError

(1.9354047253727913e-09, 3.74579145099533e-18)

In [100]:
# Evaluate the model using Root Mean Squared Error (RMSE)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="salary", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

# Print the RMSE
print("Root Mean Squared Error (RMSE):", rmse)


Root Mean Squared Error (RMSE): 1.9354047253727913e-09


In [101]:
# Make predictions on the test data
predictions = model.transform(test_data)

# Get the RMSE from the LinearRegressionSummary object
summary = model.summary
rmse = summary.rootMeanSquaredError

# Print the RMSE
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 7.42100119844466e-10
