In [1]:
import os, sys
import socket
from operator import add
from pyspark.sql import SparkSession

sparkSession = SparkSession.builder \
    .appName("spark-k8s-datalake") \
    .config("spark.executor.instances", "3") \
    .config('spark.driver.host', socket.gethostbyname(socket.gethostname())) \
    .getOrCreate()
#sc = sparkSession.sparkContext
df = sparkSession.read.csv('adl://xxxxx.azuredatalakestore.net/poc/kubernetes/sample_data.csv',inferSchema=True,
header=True)

In [2]:
 df.columns

['ratings', 'age', 'experience', 'family', 'Mobile']

In [3]:
 len(df.columns)

5

In [5]:
 df.count()

33

In [9]:
print(df.count(),len(df.columns))

33 5


In [10]:
df.printSchema()

root
 |-- ratings: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience: double (nullable = true)
 |-- family: double (nullable = true)
 |-- Mobile: string (nullable = true)



In [11]:
 df.show(3)

+-------+---+----------+------+-------+
|ratings|age|experience|family| Mobile|
+-------+---+----------+------+-------+
|      3| 32|       9.0|   3.0|   Vivo|
|      3| 27|      13.0|   3.0|  Apple|
|      4| 22|       2.5|   0.0|Samsung|
+-------+---+----------+------+-------+
only showing top 3 rows



In [12]:
df.select('age','mobile').show(5)

+---+-------+
|age| mobile|
+---+-------+
| 32|   Vivo|
| 27|  Apple|
| 22|Samsung|
| 37|  Apple|
| 27|     MI|
+---+-------+
only showing top 5 rows



In [13]:
df.describe().show()

+-------+------------------+------------------+------------------+------------------+------+
|summary|           ratings|               age|        experience|            family|Mobile|
+-------+------------------+------------------+------------------+------------------+------+
|  count|                33|                33|                33|                33|    33|
|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.9090909090909092|  null|
| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|2.0095651949432427|  null|
|    min|                 1|                22|               2.5|               0.0| Apple|
|    max|                 5|                42|              23.0|               5.5|  Vivo|
+-------+------------------+------------------+------------------+------------------+------+



In [14]:
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)

+-------+---+----------+------+-------+----------------+
|ratings|age|experience|family|Mobile |age_after_10_yrs|
+-------+---+----------+------+-------+----------------+
|3      |32 |9.0       |3.0   |Vivo   |42              |
|3      |27 |13.0      |3.0   |Apple  |37              |
|4      |22 |2.5       |0.0   |Samsung|32              |
|4      |37 |16.5      |4.0   |Apple  |47              |
|5      |27 |9.0       |1.0   |MI     |37              |
|4      |27 |9.0       |0.0   |Oppo   |37              |
|5      |37 |23.0      |5.5   |Vivo   |47              |
|5      |37 |23.0      |5.5   |Samsung|47              |
|3      |22 |2.5       |0.0   |Apple  |32              |
|3      |27 |6.0       |0.0   |MI     |37              |
+-------+---+----------+------+-------+----------------+
only showing top 10 rows



In [15]:
from pyspark.sql.types import StringType,DoubleType
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

+-------+---+----------+------+-------+----------+
|ratings|age|experience|family|Mobile |age_double|
+-------+---+----------+------+-------+----------+
|3      |32 |9.0       |3.0   |Vivo   |32.0      |
|3      |27 |13.0      |3.0   |Apple  |27.0      |
|4      |22 |2.5       |0.0   |Samsung|22.0      |
|4      |37 |16.5      |4.0   |Apple  |37.0      |
|5      |27 |9.0       |1.0   |MI     |27.0      |
|4      |27 |9.0       |0.0   |Oppo   |27.0      |
|5      |37 |23.0      |5.5   |Vivo   |37.0      |
|5      |37 |23.0      |5.5   |Samsung|37.0      |
|3      |22 |2.5       |0.0   |Apple  |22.0      |
|3      |27 |6.0       |0.0   |MI     |27.0      |
+-------+---+----------+------+-------+----------+
only showing top 10 rows



In [16]:
df.filter(df['mobile']=='Vivo').show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      3| 32|       9.0|   3.0|  Vivo|
|      5| 37|      23.0|   5.5|  Vivo|
|      4| 37|       6.0|   0.0|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
|      4| 37|       6.0|   0.0|  Vivo|
+-------+---+----------+------+------+



In [17]:
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

+---+-------+------+
|age|ratings|mobile|
+---+-------+------+
| 32|      3|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
| 37|      5|  Vivo|
| 37|      4|  Vivo|
+---+-------+------+



In [18]:
df.filter(df['mobile']=='Vivo').filter(df['experience']>10).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|   5.5|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
+-------+---+----------+------+------+



In [19]:
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()

+-------+---+----------+------+------+
|ratings|age|experience|family|Mobile|
+-------+---+----------+------+------+
|      5| 37|      23.0|   5.5|  Vivo|
|      5| 37|      13.0|   1.0|  Vivo|
+-------+---+----------+------+------+



In [20]:
df.select('mobile').distinct().show()

+-------+
| mobile|
+-------+
|     MI|
|   Oppo|
|Samsung|
|   Vivo|
|  Apple|
+-------+



In [21]:
df.groupBy('mobile').count().show(5,False)

+-------+-----+
|mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Samsung|6    |
|Vivo   |5    |
|Apple  |7    |
+-------+-----+



In [22]:
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)

+-------+-----+
|mobile |count|
+-------+-----+
|MI     |8    |
|Oppo   |7    |
|Apple  |7    |
|Samsung|6    |
|Vivo   |5    |
+-------+-----+



In [23]:
df.groupBy('mobile').mean().show(5,False)

+-------+------------------+------------------+------------------+------------------+
|mobile |avg(ratings)      |avg(age)          |avg(experience)   |avg(family)       |
+-------+------------------+------------------+------------------+------------------+
|MI     |3.5               |30.125            |10.1875           |1.5               |
|Oppo   |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|
|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.9166666666666667|
|Vivo   |4.2               |36.0              |11.4              |1.9               |
|Apple  |3.4285714285714284|30.571428571428573|11.0              |2.857142857142857 |
+-------+------------------+------------------+------------------+------------------+



In [24]:
df.groupBy('mobile').sum().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|
+-------+------------+--------+---------------+-----------+
|MI     |28          |241     |81.5           |12.0       |
|Oppo   |20          |199     |72.5           |10.0       |
|Samsung|25          |172     |52.0           |11.5       |
|Vivo   |21          |180     |57.0           |9.5        |
|Apple  |24          |214     |77.0           |20.0       |
+-------+------------+--------+---------------+-----------+



In [25]:
df.groupBy('mobile').max().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |max(ratings)|max(age)|max(experience)|max(family)|
+-------+------------+--------+---------------+-----------+
|MI     |5           |42      |23.0           |5.5        |
|Oppo   |4           |42      |23.0           |2.0        |
|Samsung|5           |37      |23.0           |5.5        |
|Vivo   |5           |37      |23.0           |5.5        |
|Apple  |4           |37      |16.5           |5.5        |
+-------+------------+--------+---------------+-----------+



In [26]:
df.groupBy('mobile').min().show(5,False)

+-------+------------+--------+---------------+-----------+
|mobile |min(ratings)|min(age)|min(experience)|min(family)|
+-------+------------+--------+---------------+-----------+
|MI     |1           |27      |2.5            |0.0        |
|Oppo   |2           |22      |6.0            |0.0        |
|Samsung|2           |22      |2.5            |0.0        |
|Vivo   |3           |32      |6.0            |0.0        |
|Apple  |3           |22      |2.5            |0.0        |
+-------+------------+--------+---------------+-----------+



In [27]:
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

+-------+---------------+
|mobile |sum(experience)|
+-------+---------------+
|MI     |81.5           |
|Oppo   |72.5           |
|Samsung|52.0           |
|Vivo   |57.0           |
|Apple  |77.0           |
+-------+---------------+



In [33]:
from pyspark.sql.functions import udf
def price_range(brand):
 if brand in ['Samsung','Apple']:
     return 'High Price'
 elif brand =='MI':
     return 'Mid Price'
 else:
     return 'Low Price'
brand_udf=udf(price_range,StringType())
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)

+-------+---+----------+------+-------+-----------+
|ratings|age|experience|family|Mobile |price_range|
+-------+---+----------+------+-------+-----------+
|3      |32 |9.0       |3.0   |Vivo   |Low Price  |
|3      |27 |13.0      |3.0   |Apple  |High Price |
|4      |22 |2.5       |0.0   |Samsung|High Price |
|4      |37 |16.5      |4.0   |Apple  |High Price |
|5      |27 |9.0       |1.0   |MI     |Mid Price  |
|4      |27 |9.0       |0.0   |Oppo   |Low Price  |
|5      |37 |23.0      |5.5   |Vivo   |Low Price  |
|5      |37 |23.0      |5.5   |Samsung|High Price |
|3      |22 |2.5       |0.0   |Apple  |High Price |
|3      |27 |6.0       |0.0   |MI     |Mid Price  |
+-------+---+----------+------+-------+-----------+
only showing top 10 rows



In [34]:
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
df.withColumn("age_group", age_udf(df.age)).show(10,False) 

+-------+---+----------+------+-------+---------+
|ratings|age|experience|family|Mobile |age_group|
+-------+---+----------+------+-------+---------+
|3      |32 |9.0       |3.0   |Vivo   |senior   |
|3      |27 |13.0      |3.0   |Apple  |young    |
|4      |22 |2.5       |0.0   |Samsung|young    |
|4      |37 |16.5      |4.0   |Apple  |senior   |
|5      |27 |9.0       |1.0   |MI     |young    |
|4      |27 |9.0       |0.0   |Oppo   |young    |
|5      |37 |23.0      |5.5   |Vivo   |senior   |
|5      |37 |23.0      |5.5   |Samsung|senior   |
|3      |22 |2.5       |0.0   |Apple  |young    |
|3      |27 |6.0       |0.0   |MI     |young    |
+-------+---+----------+------+-------+---------+
only showing top 10 rows



In [None]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
def remaining_yrs(age):
 yrs_left=(100-age)
 return yrs_left
length_udf = pandas_udf(remaining_yrs, IntegerType())
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
# ModuleNotFoundError: No module named 'pyarrow'

In [None]:
def prod(rating,exp):
 x=rating*exp
 return x
prod_udf = pandas_udf(prod, DoubleType())
df.withColumn("product",prod_udf(df['ratings'],
df['experience'])).show(10,False)
# ModuleNotFoundError: No module named 'pyarrow'