In [2]:
!pip install pyspark



In [10]:
!pip install uvicorn
!pip install fastapi

Collecting uvicorn
  Using cached uvicorn-0.22.0-py3-none-any.whl (58 kB)
Collecting h11>=0.8
  Using cached h11-0.14.0-py3-none-any.whl (58 kB)
Installing collected packages: h11, uvicorn
Successfully installed h11-0.14.0 uvicorn-0.22.0
Collecting fastapi
  Using cached fastapi-0.100.0-py3-none-any.whl (65 kB)
Collecting pydantic!=1.8,!=1.8.1,!=2.0.0,!=2.0.1,<3.0.0,>=1.7.4
  Using cached pydantic-2.0.2-py3-none-any.whl (359 kB)
Collecting typing-extensions>=4.5.0
  Downloading typing_extensions-4.7.1-py3-none-any.whl (33 kB)
Collecting starlette<0.28.0,>=0.27.0
  Using cached starlette-0.27.0-py3-none-any.whl (66 kB)
Collecting annotated-types>=0.4.0
  Using cached annotated_types-0.5.0-py3-none-any.whl (11 kB)
Collecting pydantic-core==2.1.2
  Downloading pydantic_core-2.1.2-cp39-cp39-macosx_10_7_x86_64.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 6.4 MB/s eta 0:00:01
[?25hCollecting anyio<5,>=3.4.0
  Using cached anyio-3.7.1-py3-none-any.whl (80 kB)
Collecting exc

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Model_Building').getOrCreate()
df = spark.read.csv('weight_height.csv', inferSchema=True, header=True)

23/07/12 02:36:09 WARN Utils: Your hostname, Prems-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.230 instead (on interface en0)
23/07/12 02:36:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/12 02:36:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df.show(3)

+------+----------------+----------------+
|Gender|          Height|          Weight|
+------+----------------+----------------+
|  Male| 73.847017017515|241.893563180437|
|  Male|68.7819040458903|  162.3104725213|
|  Male|74.1101053917849|  212.7408555565|
+------+----------------+----------------+
only showing top 3 rows



# 1.Creating a machine learning model in pyspark 

In [3]:
import numpy as np

In [4]:
df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)



In [5]:
df.describe().show()

+-------+------+-----------------+------------------+
|summary|Gender|           Height|            Weight|
+-------+------+-----------------+------------------+
|  count| 10000|            10000|             10000|
|   mean|  null|66.36755975482106|161.44035683283076|
| stddev|  null|3.847528120773333|32.108439006519674|
|    min|Female| 54.2631333250971|   64.700126712753|
|    max|  Male| 78.9987423463896|  269.989698505106|
+-------+------+-----------------+------------------+



In [6]:
#count null values
from pyspark.sql.functions import col,isnan, when, count
df.select([count(when (isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+------+------+
|Gender|Height|Weight|
+------+------+------+
|     0|     0|     0|
+------+------+------+



In [7]:
#create label Encoder
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='Gender', outputCol='GenderMale')
indexer = indexer.fit(df)
indexed_df = indexer.transform(df)
indexed_df.show()

+------+----------------+----------------+----------+
|Gender|          Height|          Weight|GenderMale|
+------+----------------+----------------+----------+
|  Male| 73.847017017515|241.893563180437|       1.0|
|  Male|68.7819040458903|  162.3104725213|       1.0|
|  Male|74.1101053917849|  212.7408555565|       1.0|
|  Male|71.7309784033377|220.042470303077|       1.0|
|  Male|69.8817958611153|206.349800623871|       1.0|
|  Male|67.2530156878065|152.212155757083|       1.0|
|  Male|68.7850812516616|183.927888604031|       1.0|
|  Male|68.3485155115879|167.971110489509|       1.0|
|  Male| 67.018949662883| 175.92944039571|       1.0|
|  Male|63.4564939783664|156.399676387112|       1.0|
|  Male|71.1953822829745|186.604925560358|       1.0|
|  Male|71.6408051192206|213.741169489411|       1.0|
|  Male|64.7663291334055|167.127461073476|       1.0|
|  Male|69.2830700967204|189.446181386738|       1.0|
|  Male|69.2437322298112|186.434168021239|       1.0|
|  Male|67.6456197004212|172

In [9]:
#save indexer for test data
indexer.save('string_indexer.model')

                                                                                

In [10]:
#make the feature  assembler
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols=['Height', 'GenderMale'], outputCol='Independent_feat')
output = feature_assembler.transform(indexed_df)
output.show(5)

+------+----------------+----------------+----------+--------------------+
|Gender|          Height|          Weight|GenderMale|    Independent_feat|
+------+----------------+----------------+----------+--------------------+
|  Male| 73.847017017515|241.893563180437|       1.0|[73.847017017515,...|
|  Male|68.7819040458903|  162.3104725213|       1.0|[68.7819040458903...|
|  Male|74.1101053917849|  212.7408555565|       1.0|[74.1101053917849...|
|  Male|71.7309784033377|220.042470303077|       1.0|[71.7309784033377...|
|  Male|69.8817958611153|206.349800623871|       1.0|[69.8817958611153...|
+------+----------------+----------------+----------+--------------------+
only showing top 5 rows



In [11]:
#saving feature assembler
feature_assembler.save('feature_assembler.model')

In [12]:
#taking only the independent and dependent features
finalized_data = output.select('Independent_feat', 'Weight')
finalized_data.show(5)

+--------------------+----------------+
|    Independent_feat|          Weight|
+--------------------+----------------+
|[73.847017017515,...|241.893563180437|
|[68.7819040458903...|  162.3104725213|
|[74.1101053917849...|  212.7408555565|
|[71.7309784033377...|220.042470303077|
|[69.8817958611153...|206.349800623871|
+--------------------+----------------+
only showing top 5 rows



In [13]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = finalized_data.randomSplit([0.75, 0.25])
reg = LinearRegression(featuresCol='Independent_feat',  labelCol='Weight')
reg = reg.fit(train_data)

23/07/12 02:36:46 WARN Instrumentation: [22c8c7b8] regParam is zero, which might cause numerical instability and overfitting.
23/07/12 02:36:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/07/12 02:36:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
23/07/12 02:36:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [14]:
print(reg.coefficients)
print(reg.intercept)

[5.974375425670641,19.368737273522576]
-244.73773884061185


In [15]:
test_data.show(5)

+--------------------+----------------+
|    Independent_feat|          Weight|
+--------------------+----------------+
|[54.8737275315254...|78.6066703120237|
|[55.336492408949,...|88.3665825783999|
|[55.6682021205121...|68.9825300912419|
|[56.0786997324948...|94.4883740514904|
|[56.4456850266095...|96.6402446637704|
+--------------------+----------------+
only showing top 5 rows



In [16]:
pred = reg.evaluate(test_data)
pred.predictions.show()

+--------------------+----------------+-----------------+
|    Independent_feat|          Weight|       prediction|
+--------------------+----------------+-----------------+
|[54.8737275315254...|78.6066703120237|   83.09851043868|
|[55.336492408949,...|88.3665825783999|85.86324155022302|
|[55.6682021205121...|68.9825300912419|87.84499989944189|
|[56.0786997324948...|94.4883740514904|90.29746674476783|
|[56.4456850266095...|96.6402446637704|92.48997466750924|
|[56.5341658080891...|97.7438964834685|93.01859207402507|
|[56.741741124191,...|103.540488116788|94.25872494152011|
|[56.7644564465812...|79.1743758333647| 94.3944348053943|
|[56.7854343692644...|83.9930774713752|94.51976479115442|
|[56.789386413216,...|95.3280876779566|94.54337578542001|
|[56.8103172829116...|84.1706947685606|94.66842465896733|
|[56.8560821293767...|97.3649783271705|94.94184103304801|
|[57.1038694679138...| 93.506315903823|96.42221561919641|
|[57.1373009574261...|99.1084992611307|96.62194788858224|
|[57.202660042

In [17]:
reg.save('reg_model.model')

In [18]:
spark.createDataFrame([(172.343, 'Male')], ['Height', 'Gender']).show()

[Stage 29:>                                                         (0 + 1) / 1]

+-------+------+
| Height|Gender|
+-------+------+
|172.343|  Male|
+-------+------+



                                                                                

In [19]:
#loading the saved model
from pyspark.ml.regression import LinearRegressionModel
load_model  = LinearRegressionModel.load('reg_model.model')

In [20]:
#pred using load model
new_pred = load_model.evaluate(test_data)
new_pred.predictions.show()

+--------------------+----------------+-----------------+
|    Independent_feat|          Weight|       prediction|
+--------------------+----------------+-----------------+
|[54.8737275315254...|78.6066703120237|   83.09851043868|
|[55.336492408949,...|88.3665825783999|85.86324155022302|
|[55.6682021205121...|68.9825300912419|87.84499989944189|
|[56.0786997324948...|94.4883740514904|90.29746674476783|
|[56.4456850266095...|96.6402446637704|92.48997466750924|
|[56.5341658080891...|97.7438964834685|93.01859207402507|
|[56.741741124191,...|103.540488116788|94.25872494152011|
|[56.7644564465812...|79.1743758333647| 94.3944348053943|
|[56.7854343692644...|83.9930774713752|94.51976479115442|
|[56.789386413216,...|95.3280876779566|94.54337578542001|
|[56.8103172829116...|84.1706947685606|94.66842465896733|
|[56.8560821293767...|97.3649783271705|94.94184103304801|
|[57.1038694679138...| 93.506315903823|96.42221561919641|
|[57.1373009574261...|99.1084992611307|96.62194788858224|
|[57.202660042

In [22]:

from pyspark.ml.feature import StringIndexerModel, VectorAssembler
from pyspark.ml.regression import LinearRegressionModel
Height = 172.34343
Gender = 'Male'

#create pyspark dataframe
test_df = spark.createDataFrame([(Height, Gender)], ['Height', 'Gender'])


indexer_model = StringIndexerModel.load('string_indexer.model')
assembler_model = VectorAssembler.load('feature_assembler.model')
reg_model = LinearRegressionModel.load('reg_model.model')

#convert the gender using indexer
indexed_test_df = indexer.transform(test_df)
transformed_test_df = assembler_model.transform(indexed_test_df)
transformed_test_df.show()
# needed_data = transformed_test_df.select('Independent_feat')
# print(needed_data.show())
pred = reg_model.transform(transformed_test_df)

+---------+------+----------+----------------+
|   Height|Gender|GenderMale|Independent_feat|
+---------+------+----------+----------------+
|172.34343|  Male|       1.0| [172.34343,1.0]|
+---------+------+----------+----------------+



In [31]:
pred

DataFrame[Height: double, Gender: string, GenderMale: double, Independent_feat: vector, prediction: double]

In [33]:
pred.select('prediction').show()

+----------------+
|      prediction|
+----------------+
|804.275351400699|
+----------------+

