<a href="https://colab.research.google.com/github/Wander03/CSC-369---Lab-4/blob/main/CSC_369_PySpark_Introduction_Lab_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Apache Spark / PySpark

These examples are based on [Spark 3.3.0](https://spark.apache.org/docs/3.3.0/)  

Reference/API Links


*   [Apache Spark Quick Start](https://spark.apache.org/docs/3.3.0/quick-start.html)
*   [PySpark v3.3.0 API](https://spark.apache.org/docs/3.3.0/api/python/reference/index.html)
*    [RDD Programming Guide](https://spark.apache.org/docs/3.3.0/rdd-programming-guide.html)
*    [Spark SQL Programming Guide](https://spark.apache.org/docs/3.3.0/sql-programming-guide.html)









In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


# Imports / Starter Example




In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import types as sparktypes
from pyspark.sql.functions import col

sc = SparkContext() 
spark = SparkSession(sc)

In [None]:
# create a Resilient Distributed Dataset (RDD) from a sequence of integers perform filter() and reduce() operations

# Function to be used in the filter() transformation
def filterSmall(x):    
   if x < 20:
      return False
   else:
      return True

# Function to be used in the map() transformation
def mapSquare(x):
    return x*x

# Function to be used in the reduce() action
def reduceSum(x,y):
    return x+y
  
rdd = sc.parallelize(range(100))         ## create an RDD of 100 numbers from 0 to 99

#print(rdd.filter(filterSmall).collect())

out1 = rdd.filter(filterSmall).map(mapSquare)  ## perform filter and map transformations
out2 = out1.reduce(reduceSum)                  ## perform reduce operation

print(out1.collect())           ## print first output (all numbers less than 20 squared)
print(out2)                 ## print second output (sum of all numbers from first output)


In [None]:
# download a sample access log for use in demos below
!rm -f apache.access.log
!wget -q https://raw.githubusercontent.com/databricks/reference-apps/master/logs_analyzer/data/apache.access.log

# Apache HTTP Log Example - Resilient Distributed Dataset (RDD)

A SparkContext instance can be used to create RDDs from various data/files/resources (text files, CSV, Hadoop data files, etc.)

In [None]:
# find the top 10 clients, map/reduce style using RDD transformations
access_log_rdd = (sc.textFile("apache.access.log")
                  .map(lambda line: ( line.split(" ")[0], 1 ))  # field 0 = client address
                  .reduceByKey(lambda count1, count2: count1 + count2)
                  .sortBy(lambda t: -t[1])) 

print ("Total count of client hostnames:")
print(access_log_rdd.count())

print ("Top 10 client hostnames:")
print(access_log_rdd.take(10))


# Apache HTTP Log Example - DataFrame

A DataFrame is equivalent to a relational table in Spark SQL, and can be created from on a variety of input formats (CSV, JSON, relational database, etc.) using the SparkSession.

In [None]:
access_log_df = spark.read.text("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

In [None]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

access_log_df.show(truncate=False)
access_log_df.printSchema()

In [None]:
access_log_df = spark.read.options(delimiter=" ").csv("apache.access.log")

named_df = access_log_df.select(col('_c0').alias('host'),
                                col('_c3').alias('timestamp'),
                                col('_c5').alias('path'),
                                col('_c6').cast('integer').alias('status'),
                                col('_c7').cast('integer').alias('content_size'))



named_df.show(truncate=False)
named_df.printSchema()

In [None]:
named_df.createOrReplaceTempView("log")
sql_df = spark.sql("SELECT * FROM log WHERE status = 404")

sql_df.show(truncate=False)
sql_df.printSchema()

## What About Datasets?

Added in Spark 1.6, a **Dataset** is a distributed collection of data that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map`, `flatMap`, `filter`, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

A **DataFrame** is a Dataset organized into named columns. ([source](https://spark.apache.org/docs/3.1.1/sql-programming-guide.html)) 

# Reporting Tasks (from Lab 2)


1. Most popular URL paths (top 15)
2. Request count for each HTTP response code, sorted by response code
3. Request count for each calendar month and year, sorted chronologically
4. Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)
5. Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest


# (A) RDD Implementations

Perform reporting tasks 1-5 using RDD transformations

[RDD APIs PySpark v3.3.0](https://spark.apache.org/docs/3.3.0/api/python/reference/pyspark.html#rdd-apis)

In [None]:
# RDD implementation
# (1) Most popular URL paths (top 15)

In [None]:
# RDD implementation
# (2) Request count for each HTTP response code, sorted by response code

In [None]:
# RDD implementation
# (3) Request count for each calendar month and year, sorted chronologically

In [None]:
# RDD implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)

In [None]:
# RDD implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest

# (B) DataFrame Implementations

Perform reporting tasks 1-5 using Spark's DataFrame API

[DataFrame API PySpark v.3.1.1](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)

Please Note: Spark SQL **is not** permitted for these exercises. You must use the Spark DataFrame API. 

In [None]:
# DataFrame implementation
# (1) Most popular URL paths (top 15)

In [None]:
# DataFrame implementation
# (2) Request count for each HTTP response code, sorted by response code

In [None]:
# DataFrame implementation
# (3) Request count for each calendar month and year, sorted chronologically

In [None]:
# DataFrame implementation
# (4) Total bytes sent to the client with a specified hostname or IPv4 address (you may hard code an address)

In [None]:
# DataFrame implementation
# (5) Based on a given URL (hard coded), compute a request count for each client (hostname or IPv4) who accessed that URL, sorted by request count, highest to lowest