Setting up PySpark in Colab

Spark is written in the Scala programming language and requires the Java Virtual Machine (JVM) to run. Therefore, our first task is to download Java

In [None]:
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [45.4 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:11 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:13 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:14 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packag

Next, we will install Apache Spark 3.0.1 with Hadoop 2.7

In [None]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

Now, we just need to unzip that folder.

In [None]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz

There is one last thing that we need to install and that is the findspark library. It will locate Spark on the system and import it as a regular library.

In [None]:
!pip install -q findspark

Now that we have installed all the necessary dependencies in Colab, it is time to set the environment path. This will enable us to run Pyspark in the Colab environment.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

We need to locate Spark in the system. For that, we import findspark and use the findspark.init() method.

In [None]:
import findspark
findspark.init()

If you want to know the location where Spark is installed, use findspark.find()

In [None]:
findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Now, we can import SparkSession from pyspark.sql and create a SparkSession, which is the entry point to Spark.

You can give a name to the session using appName() and add some configurations with config() if you wish.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Finally, print the SparkSession variable.

In [None]:
spark

In [None]:
#we need to load the dataset. We will use the read.csv module. 
#The inferSchema parameter provided will enable Spark to automatically determine the data type for each column but it has to go over the data once.
# If you don’t want that to happen, then you can instead provide the schema explicitly in the schema parameter.

df = spark.read.csv("/content/data.csv", header=True, inferSchema= True)
df.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [None]:
df.show(5, truncate=False)

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines   |InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+-------------------------+--------------+------------+-----+
|7590-VHVEG|Female|0            |Yes    |No        |1     |No          |No phone service|DSL            |No            |Yes         |No              |N

In [None]:
#If you didn't set inderShema to True, here is what is happening to the type. There are all in string.
df_string = spark.read.csv("/content/data.csv", header=True, inferSchema=  False)
df_string.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: string (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: string (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [None]:
#You can select and show the rows with select and the names of the features. Below, gender and churn are selected.
df.select('gender','churn').show(5)

+------+-----+
|gender|churn|
+------+-----+
|Female|   No|
|  Male|   No|
|  Male|  Yes|
|  Male|   No|
|Female|  Yes|
+------+-----+
only showing top 5 rows



In [None]:
#To get a summary statistics, of the data, you can use describe(). It will compute the :count, mean, standarddeviation, min, max
df.describe().show()

+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|summary|customerID|gender|     SeniorCitizen|Partner|Dependents|            tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|    MonthlyCharges|      TotalCharges|Churn|
+-------+----------+------+------------------+-------+----------+------------------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+------------------+------------------+-----+
|  count|      7043|  7043|              7043|   7043|      7043|     

In [None]:
#Distinct values for Categorical columns
#The distinct() will come in handy when you want to determine the unique values in the categorical columns in the dataframe.

df.select("PaymentMethod").distinct().show()

+--------------------+
|       PaymentMethod|
+--------------------+
|Credit card (auto...|
|        Mailed check|
|Bank transfer (au...|
|    Electronic check|
+--------------------+



**ICP STARTS HERE**

**1. Spark Transformations**

**a) Filter: Here i am using the filter function to show only information for those who pay with electronic check and have one year contracts**

In [None]:
df.filter("PaymentMethod='Electronic check' and contract='One year'").show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+----------------+----------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|Contract|PaperlessBilling|   PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------+----------------+----------------+--------------+------------+-----+
|5067-XJQFU|  Male|            1|    Yes|       Yes|    66|         Yes|             Yes|  

In [None]:
df.filter("TotalCharges < 500").select(['Contract', 'TotalCharges']).show()

+--------------+------------+
|      Contract|TotalCharges|
+--------------+------------+
|Month-to-month|       29.85|
|Month-to-month|      108.15|
|Month-to-month|      151.65|
|Month-to-month|       301.9|
|      Two year|       326.8|
|Month-to-month|       39.65|
|      One year|      202.25|
|Month-to-month|       20.15|
|Month-to-month|        30.2|
|Month-to-month|      181.65|
|Month-to-month|        20.2|
|Month-to-month|       45.25|
|Month-to-month|       316.9|
|      One year|       475.7|
|      One year|      418.25|
|Month-to-month|          97|
|Month-to-month|      144.15|
|Month-to-month|       244.1|
|Month-to-month|       49.05|
|Month-to-month|       177.4|
+--------------+------------+
only showing top 20 rows



**b) Groupby: telling spark to group my data by customer ID and internetservice and i am getting the maximum charge for each internetservice and customer that pays for it**

In [None]:
max_date=df.agg({"TotalCharges":"max"})
max_date.show()

+-----------------+
|max(TotalCharges)|
+-----------------+
|            999.9|
+-----------------+



In [None]:
import pyspark.sql.functions as F
df.groupBy("customerID","InternetService").agg(F.max("TotalCharges")).show()

+------+---------------+-----------------+
|gender|InternetService|max(TotalCharges)|
+------+---------------+-----------------+
|  Male|             No|           997.75|
|  Male|    Fiber optic|            999.8|
|Female|    Fiber optic|            999.9|
|Female|             No|            998.1|
|  Male|            DSL|           990.85|
|Female|            DSL|           996.85|
+------+---------------+-----------------+



**c) Join setting an alias to our max total charges. and i will be joining the above df to the main df**

In [None]:
df.join(df.groupBy("CustomerID","InternetService").agg(F.max("TotalCharges").alias("TotalCharges")),on=['customerID', 'InternetService', 'TotalCharges'],how="inner").show()

+----------+---------------+------------+------+-------------+-------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+-----+
|customerID|InternetService|TotalCharges|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|Churn|
+----------+---------------+------------+------+-------------+-------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+-----+
|7590-VHVEG|            DSL|       29.85|Female|            0

**Sort**

**we can add sort in order of payment method**

In [None]:
df.join(df.groupBy("CustomerID","InternetService").agg(F.max("TotalCharges").alias("TotalCharges")),on=['customerID', 'InternetService', 'TotalCharges'],how="inner").sort("PaymentMethod",ascending=False).show(10)

+----------+---------------+------------+------+-------------+-------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------+--------------+-----+
|customerID|InternetService|TotalCharges|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|PaymentMethod|MonthlyCharges|Churn|
+----------+---------------+------------+------+-------------+-------+----------+------+------------+----------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------+--------------+-----+
|8773-HHUOZ|            DSL|      1093.1|Female|            0|     No|       Yes| 

**showing results with totalcharges greater that $1500**

In [None]:
df.filter('TotalCharges>1500').sort(col('TotalCharges')).show()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|6614-YWYSC|  Male|            1|    Yes|        No|    61|  

In [None]:
df.sort("tenure","Gender").show(truncate=False)


+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines   |InternetService|OnlineSecurity     |OnlineBackup       |DeviceProtection   |TechSupport        |StreamingTV        |StreamingMovies    |Contract      |PaperlessBilling|PaymentMethod            |MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+-------------------------+--------------+------------+-----+
|1371-DWPAZ|Female|0            |Yes    |Yes  

**d) orderBy: displaying our dataset in descending order of the monthlycharges column**

In [None]:
df.filter("PaymentMethod='Electronic check' and contract='One year'").orderBy(col("MonthlyCharges"), ascending=False).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+----------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|Contract|PaperlessBilling|   PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------+----------------+----------------+--------------+------------+-----+
|5734-EJKXG|Female|            0|     No|        No|    61|         Yes|          Yes|    Fiber optic|           Yes|         Yes|             Yes|        Yes|        Yes|            Yes|One year|         

**e) sortwithpartitions: in this case the output will be sorted by both multiplelines and dependents**

In [None]:
df.filter("gender='Female'").sortWithinPartitions([col("MultipleLines"),col("Dependents")], ascending=False).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|     OnlineSecurity|       OnlineBackup|   DeviceProtection|        TechSupport|        StreamingTV|    StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------+----------------+--------------------+--------------+------------+-----+
|3655-SNQYZ|Female|            0|    Yes|       Yes|    69|         Ye

**f) Map and FlatMap**flatmap splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.The map function iterates over every line in RDD and split into new RDD. Using map() transformation we take in any function, and that function is applied to every element of RDD.

In [None]:
df.rdd.map(lambda line: line.StreamingTV  .split(" ")).take(50)

[['No'],
 ['No'],
 ['No'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['Yes'],
 ['Yes'],
 ['Yes'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['No'],
 ['No'],
 ['No'],
 ['No', 'internet', 'service'],
 ['No', 'internet', 'service'],
 ['No'],
 ['No'],
 ['No'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No', 'internet', 'service'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No']]

In [None]:
df.rdd.flatMap(lambda line: line.StreamingTV  .split(" ")).take(50)

['No',
 'No',
 'No',
 'No',
 'No',
 'Yes',
 'Yes',
 'No',
 'Yes',
 'No',
 'No',
 'No',
 'internet',
 'service',
 'Yes',
 'Yes',
 'Yes',
 'Yes',
 'No',
 'internet',
 'service',
 'Yes',
 'No',
 'No',
 'No',
 'No',
 'internet',
 'service',
 'No',
 'internet',
 'service',
 'No',
 'No',
 'No',
 'Yes',
 'No',
 'Yes',
 'Yes',
 'No',
 'Yes',
 'No',
 'No',
 'internet',
 'service',
 'No',
 'Yes',
 'No',
 'No',
 'Yes',
 'Yes']

In [None]:
df.printSchema()

**e) Startswith: Using Startswith to show records that start with F from the internetservice column**

In [None]:
from pyspark.sql.functions import col
new_df=df.filter(col("InternetService").startswith("F")).show()

+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+-------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|9237-HQITU|Female|            0|     No|        No|     2|         Yes|           No|    Fiber optic|            No|          No|              No|         No|         No|    

**f) groupby: Here, we are doing a groupBy on the Paymentmethod column and then finding the sum of Tenure for each paymentMethod using sum function**

In [None]:
df.groupBy("PaymentMethod").sum("Tenure").show(truncate=False)

+-------------------------+-----------+
|PaymentMethod            |sum(Tenure)|
+-------------------------+-----------+
|Credit card (automatic)  |65856      |
|Mailed check             |35190      |
|Bank transfer (automatic)|67406      |
|Electronic check         |59538      |
+-------------------------+-----------+



**2 Spark Actions**

**g) First: returning the top records of our df**

In [None]:
df.first()

Row(customerID='7590-VHVEG', gender='Female', SeniorCitizen=0, Partner='Yes', Dependents='No', tenure=1, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='Yes', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='Yes', PaymentMethod='Electronic check', MonthlyCharges=29.85, TotalCharges='29.85', Churn='No')

a. **Collect:This task helps us present the PaymentMethod column's elements as an array to the drivernode**

In [None]:
df.select('PaymentMethod').collect()

[Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Bank transfer (automatic)'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Credit card (automatic)'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Bank transfer (automatic)'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Credit card (automatic)'),
 Row(PaymentMethod='Credit card (automatic)'),
 Row(PaymentMethod='Bank transfer (automatic)'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Credit card (automatic)'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Bank transfer (automatic)'),
 Row(PaymentMethod='Credit card (automatic)'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Electronic check'),
 Row(PaymentMethod='Bank transfer (automatic)'),
 Row(PaymentMethod='Mailed check'),
 Row(PaymentMethod='Cred

Show

In [None]:
df.select('PaymentMethod').show()

+--------------------+
|       PaymentMethod|
+--------------------+
|    Electronic check|
|        Mailed check|
|        Mailed check|
|Bank transfer (au...|
|    Electronic check|
|    Electronic check|
|Credit card (auto...|
|        Mailed check|
|    Electronic check|
|Bank transfer (au...|
|        Mailed check|
|Credit card (auto...|
|Credit card (auto...|
|Bank transfer (au...|
|    Electronic check|
|Credit card (auto...|
|        Mailed check|
|Bank transfer (au...|
|Credit card (auto...|
|    Electronic check|
+--------------------+
only showing top 20 rows



**b) Count: This action will help us count the total number of records in the paymentmethod column. we can also do this for the entire df**

In [None]:
datacollect = df.("select * from |PaymentMethod ").count()
print(datacollect)

7043


In [None]:
df.count()

7043

**Counting Null records in the partner column**

In [None]:
df.filter(df["partner"].isNull()).count()

0

**c Take: with this action, we selected the internetservice column and returned the first 20 elements in a row format**

In [None]:
df.select('InternetService').take(20)

[Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='DSL'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='DSL'),
 Row(InternetService='DSL'),
 Row(InternetService='No'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='No'),
 Row(InternetService='Fiber optic'),
 Row(InternetService='DSL'),
 Row(InternetService='Fiber optic')]

**d) Min: using an aggregate function min, i will display the minimum monthly charge on the df**

In [None]:
df.agg({"MonthlyCharges":"min","MonthlyCharges":"min"}).show()

+-------------------+
|min(MonthlyCharges)|
+-------------------+
|              18.25|
+-------------------+



e) **Sum: Displaying the sum of monthly charges on each contract**

In [None]:
df.groupby("contract").agg({'MonthlyCharges':'sum'}).show()

+--------------+-------------------+
|      contract|sum(MonthlyCharges)|
+--------------+-------------------+
|Month-to-month| 257294.14999999892|
|      One year|  95816.59999999986|
|      Two year| 103005.84999999995|
+--------------+-------------------+



**f) Max:
aggregation Function: Let us show the maximum monthly charge and maximum totalcharge**

In [None]:
df.agg({"MonthlyCharges":"max","TotalCharges":"max"}).show()

+-------------------+-----------------+
|max(MonthlyCharges)|max(TotalCharges)|
+-------------------+-----------------+
|             118.75|            999.9|
+-------------------+-----------------+



Map reduce example:


In [None]:
# For map reduce we need to create SparkContext so that we can read a textfile and perform mapping function on it. 
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
text_file = sc.textFile("/content/icp4.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
for x in counts.collect():
    print (x)

('People', 1)
('who', 1)
('have', 1)
('been', 1)
('fully', 1)
('vaccinated', 1)
('against', 1)
('coronavirus', 1)
('--', 2)
('right', 1)
('now', 1)
('that', 1)
('means', 1)
('with', 2)
('two', 1)
('doses', 1)
('of', 1)
('either', 1)
('the', 4)
('Pfizer/BioNTech', 1)
('or', 1)
('Moderna', 1)
('vaccine', 1)
('can', 1)
('skip', 1)
('quarantine', 1)
('if', 1)
('they', 2)
('are', 1)
('exposed', 1)
('to', 2)
('someone', 1)
('infected', 1)
('virus,', 1)
('US', 1)
('Centers', 1)
('for', 2)
('Disease', 1)
('Control', 1)
('and', 1)
('Prevention', 1)
('said', 1)
('Wednesday.That', 1)
("doesn't", 1)
('mean', 1)
('should', 1)
('stop', 1)
('taking', 1)
('precautions,', 1)
('CDC', 1)
('noted', 1)
('in', 1)
('updated', 1)
('guidance.', 1)
("It's", 1)
('just', 1)
('not', 1)
('necessary', 1)
('them', 1)
('quarantine.', 1)


In [None]:
rdd = sc.textFile("/content/icp4.txt")

In [None]:
rdd.take(5)

["People who have been fully vaccinated against coronavirus -- right now that means with two doses of either the Pfizer/BioNTech or Moderna vaccine -- can skip quarantine if they are exposed to someone infected with the virus, the US Centers for Disease Control and Prevention said Wednesday.That doesn't mean they should stop taking precautions, the CDC noted in updated guidance. It's just not necessary for them to quarantine."]

In [None]:
df.rdd.map(lambda line: line.StreamingTV  .split(" ")).take(50)

[['No'],
 ['No'],
 ['No'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['Yes'],
 ['Yes'],
 ['Yes'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['No'],
 ['No'],
 ['No'],
 ['No', 'internet', 'service'],
 ['No', 'internet', 'service'],
 ['No'],
 ['No'],
 ['No'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No', 'internet', 'service'],
 ['No'],
 ['Yes'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['Yes'],
 ['No', 'internet', 'service'],
 ['Yes'],
 ['Yes'],
 ['No'],
 ['No'],
 ['Yes'],
 ['Yes'],
 ['No']]