### In order for Python to find the Spark, download the findspark library and start it with findspark.init() function.

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
%pwd

'/content'

In [7]:
!unzip "/content/drive/MyDrive/Colab Notebooks/Spark_Lab/in.zip" -d "/content/drive/MyDrive/Colab Notebooks/Spark_Lab/"

Archive:  /content/drive/MyDrive/Colab Notebooks/Spark_Lab/in.zip
  inflating: /content/drive/MyDrive/Colab Notebooks/Spark_Lab/in.csv  


In [8]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=e165d958a08a0a5e5285471841b45c600267a39cea6f0868c06dcbd5a313fd60
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
# import findspark
# findspark.init()

In [10]:
!pip show pyspark

Name: pyspark
Version: 3.5.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


### In order to work with RDDs, we need to create a SparkContext.

In [11]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf


## Since we write local [*] in the master, it will use all cores in our machine. If we said local [4] it will work with 4 cores.

## getOrCreate is used to create a SparkSession if not present.

In [12]:
spark=SparkSession.builder\
    .master("local[*]")\
    .appName("WordCount")\
    .getOrCreate()

In [17]:
sc=spark.sparkContext

## Read Data - RomeoJuliet Txt File

In [14]:
%ls

[0m[01;34mdrive[0m/  [01;34msample_data[0m/


In [35]:
file_path="/content/drive/MyDrive/Colab Notebooks/Spark_Lab/in.csv"

In [56]:
# read the csv file
vodafone_rdd= spark.read.csv(file_path,header=False).rdd

### Load the data from CSV file

In [57]:
# show the first 100 lines
vodafone_rdd.take(100)

[Row(_c0='224', _c1='10'),
 Row(_c0='836', _c1='100'),
 Row(_c0='81', _c1='5'),
 Row(_c0='809', _c1='50'),
 Row(_c0='786', _c1='25'),
 Row(_c0='63', _c1='5'),
 Row(_c0='478', _c1='25'),
 Row(_c0='330', _c1='10'),
 Row(_c0='511', _c1='15'),
 Row(_c0='450', _c1='25'),
 Row(_c0='521', _c1='25'),
 Row(_c0='150', _c1='50'),
 Row(_c0='671', _c1='5'),
 Row(_c0='971', _c1='5'),
 Row(_c0='439', _c1='10'),
 Row(_c0='536', _c1='50'),
 Row(_c0='253', _c1='5'),
 Row(_c0='425', _c1='15'),
 Row(_c0='638', _c1='50'),
 Row(_c0='841', _c1='10'),
 Row(_c0='324', _c1='10'),
 Row(_c0='125', _c1='50'),
 Row(_c0='937', _c1='15'),
 Row(_c0='898', _c1='5'),
 Row(_c0='461', _c1='50'),
 Row(_c0='295', _c1='10'),
 Row(_c0='683', _c1='10'),
 Row(_c0='331', _c1='25'),
 Row(_c0='187', _c1='15'),
 Row(_c0='873', _c1='100'),
 Row(_c0='303', _c1='50'),
 Row(_c0='119', _c1='5'),
 Row(_c0='589', _c1='100'),
 Row(_c0='742', _c1='15'),
 Row(_c0='81', _c1='25'),
 Row(_c0='47', _c1='100'),
 Row(_c0='521', _c1='50'),
 Row(_c0

### Define map function to emit customer ID and prepaid card amount


In [59]:
def map_to_customer_amount(row):
    customer_id = row[0]  # customer ID is in the first column
    card_amount =  int(row[1])  # card amount is in the second column
    return (customer_id, card_amount)

### Map each row to (customer_id, card_amount)

In [60]:
vodafone_rdd=vodafone_rdd.map(map_to_customer_amount)

In [61]:
vodafone_rdd.take(10)

[('224', 10),
 ('836', 100),
 ('81', 5),
 ('809', 50),
 ('786', 25),
 ('63', 5),
 ('478', 25),
 ('330', 10),
 ('511', 15),
 ('450', 25)]

### Reduce by key to get total card amount for each customer


In [62]:
vodafone_rdd = vodafone_rdd.reduceByKey(lambda x,y:(x+y)).sortByKey()

### Collect and display the result


In [63]:
customer_total_amount = vodafone_rdd.collect()
for customer_id, total_amount in customer_total_amount:
    print("Customer ID:", customer_id, "Total Amount:", total_amount)

Customer ID: 0 Total Amount: 2554710
Customer ID: 1 Total Amount: 2540660
Customer ID: 10 Total Amount: 2525035
Customer ID: 100 Total Amount: 2568385
Customer ID: 101 Total Amount: 2564425
Customer ID: 102 Total Amount: 2538315
Customer ID: 103 Total Amount: 2564125
Customer ID: 104 Total Amount: 2555395
Customer ID: 105 Total Amount: 2569490
Customer ID: 106 Total Amount: 2555705
Customer ID: 107 Total Amount: 2550640
Customer ID: 108 Total Amount: 2551170
Customer ID: 109 Total Amount: 2560765
Customer ID: 11 Total Amount: 2561655
Customer ID: 110 Total Amount: 2563440
Customer ID: 111 Total Amount: 2528045
Customer ID: 112 Total Amount: 2543330
Customer ID: 113 Total Amount: 2544710
Customer ID: 114 Total Amount: 2538730
Customer ID: 115 Total Amount: 2541695
Customer ID: 116 Total Amount: 2539050
Customer ID: 117 Total Amount: 2558005
Customer ID: 118 Total Amount: 2567940
Customer ID: 119 Total Amount: 2581715
Customer ID: 12 Total Amount: 2567315
Customer ID: 120 Total Amount: 2

In [64]:
vodafone_rdd.take(4)

[('0', 2554710), ('1', 2540660), ('10', 2525035), ('100', 2568385)]

### Stop the Spark session

In [65]:
spark.stop()