# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.3.2 with hadoop 3.2, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

In [None]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [None]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    %pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

# Start a Local Cluster
Use findspark.init() to start a local cluster.  If you plan to use remote cluster, skip the findspark.init() and change the cluster_url according.

In [None]:
import findspark
findspark.init()

In [None]:
spark_url = 'local'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark Tutorial')\
        .config('spark.ui.port', '4040')\
        .getOrCreate()

In [None]:
path = 'KU_cited.csv'
df = spark.read.csv(path, header=True, inferSchema=True)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, regexp_replace, sum as _sum, collect_list, concat_ws, lit

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CSV Analysis") \
    .getOrCreate()

# Load the CSV file
file_path = "KU_cited.csv"
df = spark.read.option("header", "true").csv(file_path)

# Debug: Show the initial data
df.show(truncate=False)

# Function to process a column and ensure proper cleaning and splitting
def process_column(df, col_name, new_col):
    # Clean the list format, remove unwanted characters and spaces
    cleaned_col = regexp_replace(col(col_name), "[\\[\\]']", "")

    # Explode the list and split into 'key' and 'value'
    df = df.withColumn(new_col, explode(split(cleaned_col, ", "))) \
           .withColumn("key", split(col(new_col), "~")[0]) \
           .withColumn("value", split(col(new_col), "~")[1].cast("int")) \
           .na.drop(subset=["value"])  # Drop rows with null values in the 'value' column

    # Clean the 'key' column by removing unwanted characters like quotes or other special chars
    df = df.withColumn("key", regexp_replace(col("key"), '[^a-zA-Z0-9]', ''))  # Remove non-alphanumeric characters

    return df

# Process and group year column
year_df = process_column(df, "year", "year_item")
year_grouped = year_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("order").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
year_grouped = year_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final year table
print("Year Table:")
year_grouped.show(truncate=False)

# Process and group subject column
subject_df = process_column(df, "subject", "subject_item")
subject_grouped = subject_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("order").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
subject_grouped = subject_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final subject table
print("Subject Table:")
subject_grouped.show(truncate=False)

# Process and group country1 column
country_df = process_column(df, "country1", "country_item")
country_grouped = country_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("order").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
country_grouped = country_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final country1 table
print("Country Table:")
country_grouped.show(truncate=False)

# Save results to CSV
output_path = "/content"
year_grouped.write.csv(output_path + "/year_sum_list.csv", header=True)
subject_grouped.write.csv(output_path + "/subject_sum_list.csv", header=True)
country_grouped.write.csv(output_path + "/country_sum_list.csv", header=True)


+-----+-----------+------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, split, regexp_replace, sum as _sum, collect_list, concat_ws, lit

# Initialize Spark session
spark = SparkSession.builder \
    .appName("CSV Analysis") \
    .getOrCreate()

# Load the CSV file
file_path = "Citebydata_Final.csv"
df = spark.read.option("header", "true").csv(file_path)

# Debug: Show the initial data
df.show(truncate=False)

# Function to process a column and ensure proper cleaning and splitting
def process_column(df, col_name, new_col):
    # Clean the list format, remove unwanted characters and spaces
    cleaned_col = regexp_replace(col(col_name), "[\\[\\]']", "")

    # Explode the list and split into 'key' and 'value'
    df = df.withColumn(new_col, explode(split(cleaned_col, ", "))) \
           .withColumn("key", split(col(new_col), "~")[0]) \
           .withColumn("value", split(col(new_col), "~")[1].cast("int")) \
           .na.drop(subset=["value"])  # Drop rows with null values in the 'value' column

    # Clean the 'key' column by removing unwanted characters like quotes or other special chars
    df = df.withColumn("key", regexp_replace(col("key"), '[^a-zA-Z0-9]', ''))  # Remove non-alphanumeric characters

    return df

# Process and group year column
year_df = process_column(df, "year", "year_item")
year_grouped = year_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("filename").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
year_grouped = year_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final year table
print("Year Table:")
year_grouped.show(truncate=False)

# Process and group subject column
subject_df = process_column(df, "subj", "subject_item")
subject_grouped = subject_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("filename").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
subject_grouped = subject_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final subject table
print("Subject Table:")
subject_grouped.show(truncate=False)

# Process and group country1 column
country_df = process_column(df, "country", "country_item")
country_grouped = country_df.groupBy("key").agg(
    _sum("value").alias("sum"),
    collect_list("filename").alias("list_of_orders")
)

# Convert array of orders to a comma-separated string
country_grouped = country_grouped.withColumn("list_of_orders", concat_ws(",", col("list_of_orders")))

# Show final country1 table
print("Country Table:")
country_grouped.show(truncate=False)

# Save results to CSV
output_path = "/content"
year_grouped.write.csv(output_path + "/CU_year_sum_list.csv", header=True)
subject_grouped.write.csv(output_path + "/CU_subject_sum_list.csv", header=True)
country_grouped.write.csv(output_path + "/CU_country_sum_list.csv", header=True)


+--------------+---+---------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+
|filename      |num|year                                                                       |subj                                                                                                                                                                                                                                                      |country                                                                                                                                                |


# Spark Entry Points

In [None]:
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

## Simple RDD Operations

There are 2 types of RDD operations, tranformation and action.  Transformation is an operation applied on a RDD to create new RDD (or create a new RDD from data).  Action is an operation applied on a RDD to perform computation and send the result back to driver.

### Transformation Operations
- *sc.parallelize(data)*
create an RDD from data
- *rdd.filter(func)*
create a new rdd from existing rdd and keep only those elements that func is true

### Action Operations
- *rdd.count()*
count number of elements in an rdd
- *rdd.first()*
get the frist element in the rdd
- *rdd.collect()*
gather all elements in the rdd into a python list
- *rdd.take(n)*
gather first n-th elements in the rdd into a python list

In [None]:
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

In [None]:
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [None]:
n = rdd.count()
print('count = {0}'.format(n))

count = 5


In [None]:
rdd.first()

1

In [None]:
l = rdd.collect()
print(l)

[1, 2, 3, 4, 5]


In [None]:
l = rdd.take(3)
print(l)

[1, 2, 3]


In [None]:
f_rdd = rdd.filter(lambda d: d > 2)

In [None]:
f_rdd.collect()

[3, 4, 5]

In [None]:
f_rdd.count()

3

## RDD Operations - map and reduce

- *rdd.map(func)* -- **transformation** --
create a new rdd by performing function func on each element in an rdd
- *rdd.reduce(func)* -- **action** --
aggregate all elements in an rdd using function func

These two operations perform functions on rdd elements.  The function can be provided using lambda function.
We can supply any lambda function to map and reduce operations.  For map operation, the function must take one input and return one output.  For reduce operation, the function must take two inputs and return one output.

In [None]:
data = ['line 1', '2', 'more lines', 'last line']

In [None]:
lines = sc.parallelize(data)

In [None]:
print(lines)

ParallelCollectionRDD[6] at readRDDFromFile at PythonRDD.scala:274


In [None]:
print(lines.collect())

['line 1', '2', 'more lines', 'last line']


Count the length of each line in the RDD and store results in a new RDD

In [None]:
lineLengths = lines.map(lambda line: len(line))
print(lineLengths.collect())

[6, 1, 10, 9]


Sum the lenght of lines in the RDD.  As RDD is partitioned, this reduce operation performs in a parallel fashion.

In [None]:
totalLength = lineLengths.reduce(lambda a, b: a+b)
print(totalLength)

26


In [None]:
data = (1,2,3,4)
rdd = sc.parallelize(data)
rdd2 = rdd.map(lambda x: x*2)
print(rdd2.collect())
sum_val = rdd2.reduce(lambda a, b: a+b)
print('sum = {0}'.format(sum_val))
mul_val = rdd2.reduce(lambda a, b: a*b)
print('mul = {0}'.format(mul_val))

[2, 4, 6, 8]
sum = 20
mul = 384


## RDD Operations - aggregate

Aggregate is an action operation *rdd.aggregate(zeroValue, seqOp, combOp)* that:
- performs *seqOp* to *zeroValue* and all RDD elements -- this basically transforms all elements in RDD into the type of output value
- and then aggregates the transformed RDD elements using *combOp*

Note that reduce is a simple form of aggreate operation.

In [None]:
rdd.collect()

[1, 2, 3, 4]

The following aggregate operation is basically a *rdd.reduce(lambda a, b: a+b)* as the type output value is an integer which is the same as the RDD elements

In [None]:
rdd.aggregate(0,
              lambda zero, e: zero+e,
              lambda a, b: a+b)

10

In [None]:
rdd.aggregate(0,
              lambda zero, e: zero+1,
              lambda a, b: a+b)

4

The following aggregate operation returns an order pairs of (x, y) where
- x is the sum of all elements in RDD
- y is the count of all elements in RDD

In [None]:
rdd.aggregate((0, 0),
              lambda zero, e: (zero[0]+e, zero[1]+1),
              lambda a, b: (a[0]+b[0], a[1]+b[1]))

(10, 4)

In [None]:
lines.collect()

['line 1', '2', 'more lines', 'last line']

The following aggregate operation returns an order pairs of (x, y) where
- x is the concatenation of all elements in RDD
- y is the sum of the length of all elements in RDD

In [None]:
lines.aggregate(("", 0),
                lambda zero, e: (zero[0]+e, zero[1]+len(e)),
                lambda a, b: (a[0]+b[0], a[1]+b[1]))

('line 12more lineslast line', 26)

In [None]:
lines.collect()

['line 1', '2', 'more lines', 'last line']

In [None]:
lines.reduce(lambda s1, s2: s1+s2)

# Example: Word Count

Word Count is an "Hello World" for big data programming.  In this example, we will perform word counting e.g. counting number of word occurances in a text file, "star-wars.txt".  Note that the code is not perfect as it still cannot handle punctuations, plural nouns, and those verbs in past tense properly.

Before running this example, make sure that a data file 'star-wars.txt' has been uploaded to content folder of this colab.

### Reading from "star-wars.txt"

First, read the content of the file using sc.textFile().  This creates an rdd whose elements are lines in the input file.

In [None]:
sw = sc.textFile('star-wars.txt')
for line in sw.take(10):
    print('{0}: [{1}]'.format(len(line), line))

In [None]:
print('Total = {0} lines'.format(sw.count()))

### Data Cleansing

Remove all blank lines and lower all characters in all lines.

In [None]:
nb_lines = sw.filter(lambda line: len(line) > 0)
print('Non blank line = {0} lines'.format(nb_lines.count()))
all_lowers = nb_lines.map(lambda line: line.lower())
for line in all_lowers.take(10):
    print('{0}: [{1}]'.format(len(line), line))

### Data Preparation - from lines to words

We can split each line into words.  Note that if we use *map* each element in the output RDD from *map* is a list of words in each line.  However, if we use *flatMap* lists in all lines are combined into an RDD of all words.

For clear understanding, let take a look at the results of map vs. flatmap below:

In [None]:
words_map = all_lowers.map(lambda line: line.split())
for l in words_map.take(5):
    print(l)

In [None]:
words = all_lowers.flatMap(lambda line: line.split())
for w in words.take(10):
    print(w)

### Counting Occurances

To count the occurances of each word, we first transform a word into a pairwise (key, value) of (word, 1)

In [None]:
words.map(lambda word: (word, 1)).take(5)

After transformation, we can count the occurances using *reduceByKey* which perform reduce(function) for all elements with the same key

In [None]:
mappers = words.map(lambda word: (word, 1))
counts = mappers.reduceByKey(lambda x, y: x+y)
for wc in counts.take(10):
    print(wc)

In [None]:
spark.stop()