In [2]:
#!pip install pyspark

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [4]:
spark

# Read Dataframe

In [21]:
df = spark.read.option('header','true').csv("E:/DS/Datasets/house_price.csv",inferSchema=True)
df.show()

+----+--------+-----+
|area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
|1521|       3| 75.0|
|1200|       2| 51.0|
|1170|       2| 38.0|
|2732|       4|135.0|
|3300|       4|155.0|
|1310|       3| 50.0|
|3700|       5|167.0|
|1800|       3| 82.0|
|2785|       4|140.0|
|1000|       2| 38.0|
|1100|       2| 40.0|
|2250|       3|101.0|
|1175|       2| 42.0|
|1180|       3| 48.0|
|1540|       3| 60.0|
|2770|       3|102.0|
| 800|       1| 32.0|
+----+--------+-----+



In [22]:
df.head(3)

[Row(area=1056, bedrooms=2, price=39.07),
 Row(area=2600, bedrooms=4, price=120.0),
 Row(area=1440, bedrooms=3, price=62.0)]

In [23]:
df.printSchema()

root
 |-- area: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- price: double (nullable = true)



In [24]:
df = spark.read.csv("E:/DS/Datasets/house_price.csv",inferSchema=True,header=True)


In [25]:
df.show(5)

+----+--------+-----+
|area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
|1521|       3| 75.0|
|1200|       2| 51.0|
+----+--------+-----+
only showing top 5 rows



In [29]:
df.columns

['area', 'bedrooms', 'price']

In [30]:
df.head(3)

[Row(area=1056, bedrooms=2, price=39.07),
 Row(area=2600, bedrooms=4, price=120.0),
 Row(area=1440, bedrooms=3, price=62.0)]

In [36]:
df.select(['area','bedrooms']).show(3)

+----+--------+
|area|bedrooms|
+----+--------+
|1056|       2|
|2600|       4|
|1440|       3|
+----+--------+
only showing top 3 rows



In [37]:
df.dtypes

[('area', 'int'), ('bedrooms', 'int'), ('price', 'double')]

In [38]:
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|             area|          bedrooms|             price|
+-------+-----------------+------------------+------------------+
|  count|               20|                20|                20|
|   mean|          1821.45|               2.9|           78.8535|
| stddev|864.6157938949713|0.9679060415469871|43.761901030943626|
|    min|              800|                 1|              32.0|
|    max|             3700|                 5|             167.0|
+-------+-----------------+------------------+------------------+



In [39]:
import math

In [57]:
# Add col
df = df.withColumn('side',pyspark.sql.functions.sqrt(df['area']))

In [58]:

df.show(3)

+----+--------+-----+-----------------+
|Area|bedrooms|price|             side|
+----+--------+-----+-----------------+
|1056|       2|39.07|32.49615361854384|
|2600|       4|120.0|50.99019513592785|
|1440|       3| 62.0|37.94733192202055|
+----+--------+-----+-----------------+
only showing top 3 rows



In [59]:
df = df.drop('side')
df.show(3)

+----+--------+-----+
|Area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
+----+--------+-----+
only showing top 3 rows



In [60]:
df = df.withColumnRenamed('area','Area')
df.show(3)

+----+--------+-----+
|Area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
+----+--------+-----+
only showing top 3 rows



In [66]:
df.dropna(thresh=2).show()

+----+--------+-----+
|Area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
|1521|       3| 75.0|
|1200|       2| 51.0|
|1170|       2| 38.0|
|2732|       4|135.0|
|3300|       4|155.0|
|1310|       3| 50.0|
|3700|       5|167.0|
|1800|       3| 82.0|
|2785|       4|140.0|
|1000|       2| 38.0|
|1100|       2| 40.0|
|2250|       3|101.0|
|1175|       2| 42.0|
|1180|       3| 48.0|
|1540|       3| 60.0|
|2770|       3|102.0|
| 800|       1| 32.0|
+----+--------+-----+



In [67]:
df.na.fill('nan').show()

+----+--------+-----+
|Area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|2600|       4|120.0|
|1440|       3| 62.0|
|1521|       3| 75.0|
|1200|       2| 51.0|
|1170|       2| 38.0|
|2732|       4|135.0|
|3300|       4|155.0|
|1310|       3| 50.0|
|3700|       5|167.0|
|1800|       3| 82.0|
|2785|       4|140.0|
|1000|       2| 38.0|
|1100|       2| 40.0|
|2250|       3|101.0|
|1175|       2| 42.0|
|1180|       3| 48.0|
|1540|       3| 60.0|
|2770|       3|102.0|
| 800|       1| 32.0|
+----+--------+-----+



##### Fill missing values

In [71]:
from pyspark.ml.feature import Imputer
imputer = Imputer(
      inputCols=['Area','bedrooms','price'],
       outputCols=["{}_imputed".format(c) for c in ['Area','bedrooms','price']]
).setStrategy('mean')

In [72]:
imputer.fit(df).transform(df).show()

+----+--------+-----+------------+----------------+-------------+
|Area|bedrooms|price|Area_imputed|bedrooms_imputed|price_imputed|
+----+--------+-----+------------+----------------+-------------+
|1056|       2|39.07|        1056|               2|        39.07|
|2600|       4|120.0|        2600|               4|        120.0|
|1440|       3| 62.0|        1440|               3|         62.0|
|1521|       3| 75.0|        1521|               3|         75.0|
|1200|       2| 51.0|        1200|               2|         51.0|
|1170|       2| 38.0|        1170|               2|         38.0|
|2732|       4|135.0|        2732|               4|        135.0|
|3300|       4|155.0|        3300|               4|        155.0|
|1310|       3| 50.0|        1310|               3|         50.0|
|3700|       5|167.0|        3700|               5|        167.0|
|1800|       3| 82.0|        1800|               3|         82.0|
|2785|       4|140.0|        2785|               4|        140.0|
|1000|    

##### Filter operation

In [73]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [74]:
df = spark.read.csv("E:/DS/Datasets/house_price.csv",inferSchema=True,header=True)

In [79]:
df.filter('area<=1500').select('price').show()

+-----+
|price|
+-----+
|39.07|
| 62.0|
| 51.0|
| 38.0|
| 50.0|
| 38.0|
| 40.0|
| 42.0|
| 48.0|
| 32.0|
+-----+



In [85]:
df.filter(
    (df['area']<=1500) &
    (df['price']<=40)).show()

+----+--------+-----+
|area|bedrooms|price|
+----+--------+-----+
|1056|       2|39.07|
|1170|       2| 38.0|
|1000|       2| 38.0|
|1100|       2| 40.0|
| 800|       1| 32.0|
+----+--------+-----+



In [86]:
df.filter(
    ~((df['area']<=1500) &
    (df['price']<=40))).show()

+----+--------+-----+
|area|bedrooms|price|
+----+--------+-----+
|2600|       4|120.0|
|1440|       3| 62.0|
|1521|       3| 75.0|
|1200|       2| 51.0|
|2732|       4|135.0|
|3300|       4|155.0|
|1310|       3| 50.0|
|3700|       5|167.0|
|1800|       3| 82.0|
|2785|       4|140.0|
|2250|       3|101.0|
|1175|       2| 42.0|
|1180|       3| 48.0|
|1540|       3| 60.0|
|2770|       3|102.0|
+----+--------+-----+



##### Group by and Aggregate

In [90]:
df.groupBy('bedrooms').sum().show()

+--------+---------+-------------+----------+
|bedrooms|sum(area)|sum(bedrooms)|sum(price)|
+--------+---------+-------------+----------+
|       1|      800|            1|      32.0|
|       3|    13811|           24|     580.0|
|       5|     3700|            5|     167.0|
|       4|    11417|           16|     550.0|
|       2|     6701|           12|    248.07|
+--------+---------+-------------+----------+



In [91]:
df.groupBy('bedrooms').mean().show()

+--------+------------------+-------------+----------+
|bedrooms|         avg(area)|avg(bedrooms)|avg(price)|
+--------+------------------+-------------+----------+
|       1|             800.0|          1.0|      32.0|
|       3|          1726.375|          3.0|      72.5|
|       5|            3700.0|          5.0|     167.0|
|       4|           2854.25|          4.0|     137.5|
|       2|1116.8333333333333|          2.0|    41.345|
+--------+------------------+-------------+----------+



In [92]:
df.groupBy('bedrooms').count().show()

+--------+-----+
|bedrooms|count|
+--------+-----+
|       1|    1|
|       3|    8|
|       5|    1|
|       4|    4|
|       2|    6|
+--------+-----+



In [97]:
df.agg({'price':'sum'}).show()

+----------+
|sum(price)|
+----------+
|   1577.07|
+----------+



##### spark MLib

In [98]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Practice").getOrCreate()

In [99]:
df = spark.read.csv("E:/DS/Datasets/house_price.csv",inferSchema=True,header=True)

In [100]:
df.columns

['area', 'bedrooms', 'price']

In [103]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['area', 'bedrooms'],outputCol='X') 

In [105]:
out = assembler.transform(df)
out.show(3)

+----+--------+-----+------------+
|area|bedrooms|price|           X|
+----+--------+-----+------------+
|1056|       2|39.07|[1056.0,2.0]|
|2600|       4|120.0|[2600.0,4.0]|
|1440|       3| 62.0|[1440.0,3.0]|
+----+--------+-----+------------+
only showing top 3 rows



In [106]:
train = out.select(['X','price'])
train.show(3)

+------------+-----+
|           X|price|
+------------+-----+
|[1056.0,2.0]|39.07|
|[2600.0,4.0]|120.0|
|[1440.0,3.0]| 62.0|
+------------+-----+
only showing top 3 rows



In [107]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=train.randomSplit([0.75,0.25])

In [108]:
regressor = LinearRegression(featuresCol='X',labelCol='price')

In [109]:
regressor = regressor.fit(train_data)

In [111]:
regressor.coefficients

DenseVector([0.0522, -0.345])

In [113]:
regressor.intercept

-13.901241631776424

In [115]:
pred = regressor.evaluate(test_data)
pred.predictions.show()

+------------+-----+-----------------+
|           X|price|       prediction|
+------------+-----+-----------------+
|[1056.0,2.0]|39.07|40.51268928765786|
|[1521.0,3.0]| 75.0| 64.4322031047365|
|[1540.0,3.0]| 60.0|65.42365677473859|
|[2770.0,3.0]|102.0|129.6072364643472|
|[3300.0,4.0]|155.0|156.9185654682751|
+------------+-----+-----------------+



In [121]:
print(spark.version)

3.3.2
