# PySpark

## Big Data for Data Engineers
Big data is all around us! Learn how to harness it here


### What is PySpark?
PySpark is an API for Apache Spark. Apache Spark is an open-source framework for distributed computing. Distributed computing makes handling big data with a little (or medium) computer possible. And this unit will cover everything you need to know to get started managing big data.

## What will Big Data with PySpark for Data Engineers cover?
- Conceptual foundations of big data
- Writing programs in PySpark
- Using SQL-style queries in PySpark
- Handing data with Resilient Distributed Datasets (RDDs)
- Handling data with PySpark DataFrames
- Generating summary reports from big data datasets


## What is Big Data?
“big data” when referring to a dataset that was unmanageable with current business intelligence tools.
- Sources of data have continued to grow
- Outpacing the growth of computing power

After the massive growth of the internet (mid-2000s). Many analysts in the industry were struggling to handle their own data.

- Big data: Greater than our available computing power can handle
- How we can describe it with the 3 Vs (volume, velocity, variety)
- Applications of big data


### Data is Everywhere
We generate data from all kinds of activities:

    *   whether a transaction at a local store
    *   website we visit
    *   the location of our cell phone
    *   tweets are written on Twitter



- At the time of this writing, an average of 500 million tweets are written on Twitter each day. A person at Twitter can’t analyze this data! Data of this size is often referred to as big data.

- Big data is any data that is too big for a typical modern computer to process and analyze.

- However, the definition of big data is relative to the amount of computing power we have available.


For example:

- Most current personal computers have somewhere between 8-32 GB of random access memory (RAM) available for data processing. That means, from the perspective of a personal computer, any dataset larger than 10-20 GB might be too large to process.

- A large enterprise can take advantage of larger computing resources (i.e., a warehouse of servers or cloud computing ), so 100+ GB might be the upper limit for the size of data.

- Data measured in terabytes (1 TB = 1000 GB) is the largest amount of data being worked with at this time .





### The 3 Vs
Define big data using the features that make it hard to handle in the first place. We can generally categorize big data by what are known as the three Vs:

    *   Volume

Big data is “big”.
“big” the data is bigger than the amount of available computing power. Zettabytes of data are created every year ( zettabyte is 1 billion terabytes).

    *   Velocity

Big data has velocity, it is growing quickly.
Through means like apps and sensors, data becomes faster, cheaper, and easier to collect automatically and continuously.
If data were simply large, but slow-changing, then over time our computing power would eventually catch up to the size of the data.

    *   Variety

Big data also has variety, it comes in different, complex, forms.
In today’s data ecosystem, data comes in many more formats than the data tables of old. Data can be categorized as:
- structured (data tables with rows and columns)
- semi-structured (think JSON files with nested data)
- unstructured (audio, image, and video data).

Each of these data formats presents different challenges in processing.
**bold text**

### Big Data Applications
What can we do with big data?

When do we run into big data in the real world? Let’s explore a few examples of big data applications across various industries.

    *   Social Media
With an average of 500 million tweets per day, Twitter data would definitely qualify as big data. Despite the massive amount of data at its disposal, Twitter provides analytics for each user with the ability to dive into historical Tweet activity and identify trends. In order to provide this for each Twitter user, they must be using some kind of big data toolkit to store and analyze the data.

    *   Healthcare
If we look at the healthcare industry, a rising trend that many providers want to enable is known as evidence-based medicine. Healthcare providers want to combine data from several sources, including cell phone apps, diagnostic tests, and previous medical records, to give recommendations for each patient. Providers hope this will avoid expensive and unnecessary tests and improve patient outcomes. In this case, there is a lot of data from many different sources and formats, but the efforts could provide a huge impact for their patients.

    *   Finance
In the financial industry, credit card companies aim to reduce the amount of fraudulent transactions, as these cost money and cause hardship for their customers. Using different tools, credit card companies are able to analyze every single credit card transaction and use machine learning models to identify transactions that could be fraudulent. This saves a massive amount of money for themselves and for their customers!

    *   Final Thoughts
No matter which industry we look at, we will find numerous examples of big data everywhere, and there are more every day. However, we need to be aware that big data often comes with both big challenges and big effects. With the right tools and techniques, we can begin to extract value from big data while being mindful of both its limitations and impacts.


## Bias in Data
Biases are systematic errors in thinking influenced by cultural and personal experiences.

    *   Biases distort our perception and cause us to make incorrect decisions.
    *   Humans have many biases, both implicit and explicit.
    
To understand the different types of bias that show up at each stage of analyzing data. Types of bias that impact data analysis and data-driven decision-making.


    1.   Automation bias

Automation bias stems from the idea that computers or machines are

              *   more trustworthy than humans
              *   more objective than humans

Automation bias is at the root of why people follow their GPS into trouble, even when contradictory information is available.
Computers, data, and algorithms are not actually completely objective:

- Human biases can be encoded into the algorithms.
- Look at more information sources when we evaluate data analysis results or reports.
- data analysis not immune to bias

Pay attention to other information streams (our eyes and ears) when we drive with GPS.


    2.   Bias in building and optimizing algorithms

Algorithmic bias arises when an algorithm produces systematic and repeatable errors that lead to unfair outcomes, such as privileging one group over another.

Algorithmic bias can be initiated through selection bias and then reinforced and perpetuated by other bias types.

    3.   Bias in interpreting results and drawing conclusions
Bias also influences the final stages of data analysis: interpreting results and drawing conclusions. The following bias types are ones we should watch out for when evaluating or generating data reports:

- Confirmation bias is our tendency to seek out information that supports our views.

    *   influences data analysis when we consciously or unconsciously interpret results in a way that supports our original hypothesis.
    
    *   To limit confirmation bias, clearly state hypotheses and goals before starting an analysis, and then honestly evaluate how they influenced our interpretation and reporting of results.

- Overgeneralization bias is inappropriately extending observations made with one dataset to other datasets, leading to overinterpreting results and unjustified extrapolation. To limit overgeneralization bias, be thoughtful when interpreting data, only extend results beyond the dataset used to generate them when it is justified, and only extend results to the proper population.

- Reporting bias is the human tendency to only report or share results that affirm our beliefs or hypotheses, also known as “positive” results. Editors, publishers, and readers are also subject to reporting bias as positive results are published, read, and cited more often. To limit reporting bias, report negative results and cite others who do, too.

       Conclusions
Data and machine learning algorithms are now ubiquitous. They influence decisions about
       - Who is hired or fired,
       - Who accepted into schools
       - Who allowed to rent houses
       - Influence which neighborhoods are more heavily policed
       - Who is granted parole.

Therefore, we must recognize that data and algorithms can be biased, just like the humans who create and train them. Learning more about the types of bias that influence how algorithms function will improve our ability to perform and interpret data analyses and will help us make more informed decisions.


## Big Data Storage and Computing
Learn about the challenges of storing and analyzing big data


#### Big Data Challenges
Every single day, over 2.510^18 bytes of data are created:

         * transactional sales data
         * Internet of Things (IoT) devices

… data sources grow in both size and velocity at a rapid rate. When thinking about the massive scale of data.


1. Where are all of these data stored?

Basic dataset as a table in Excel or an equivalent application. we pull an entire dataset into RAM on a single processing machine for computation and either crash or take too long to process, making analysis impossible.

2. How do we get enough computing power to process it?



#### Big Data Storage
* A popular solution for big datasets is a distributed file system on a network of hardware called a cluster.

* A cluster is a group of several machines called nodes, with a cluster manager node and multiple worker nodes.

![Big Data Storage](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Big%20Data%20Storage.png)


- The cluster manager manages resources and sends commands to the worker nodes that store the data.
- Data saved on worker nodes are replicated multiple times for fault tolerance.

    *   This allows access to the complete dataset even in the event that one of the worker nodes goes offline.

    *   This type of file storage system is also easily and infinitely scalable, as additional worker nodes can be added indefinitely.


##### Hadoop Distributed File System (HDFS)

    Cluster node <- Cluster Manager <- MapReduce framework <- HDFS store data <- Apache

1. MapReduce is a computing framework for analyzing datasets housed on a distributed file system like HDFS. **MapReduce is a disk-oriented method**, which means
2. MapReduce writes data to disk in intermediate steps of analysis. While this method allows us to process data stored on the HDFS, it can still be a slow process for analysing larger datasets.

Framework for a cluster system is called Hadoop Distributed File System (HDFS), which is part of a set of tools distributed by Apache.

- HDFS was designed to store vast amounts of data to be processed using another framework called MapReduce.

- HDFS requires a specific hardware configuration that can be a costly barrier to entry. For this reason, cloud-hosted HDFS is a popular fix.

    *   Microsoft Azure
    *   Amazon Web Services (AWS)

  … offer cloud-based HDFS solutions, allowing companies to outsource a system’s setup and hardware management for a fixed monthly cost.




1. HDFS solutions both store and process data on each worker node,
  - they ensure that we have enough computing power to tackle our data problems.
1. When data grows in size, our number of nodes may be increased to add more storage and computing power.
1. This is advantageous for scaling but can become expensive as the number of nodes increases.

MapReduce was the standard for big data processing for a while, but over time it could not keep up with the rate at which data were growing and changing.
* The map function collects specifically defined elements of data from each node as key-value tuple pairs.
* The reduce function is an analytical function applied to each key-value pair dataset whose solution is returned as output.

Apache Spark is a better alternative for processing. Spark’s main benefit was the ability to process data in the node’s memory instead of processing on disk as MapReduce does. This provided much better performance and unlocked new capabilities for working with big data.

    Cluster node <- MapReduce framework <- Cluster Manager <- HDFS store data <- Apache


In order to get value out of big data, we need to utilize the best strategies for storing our data and providing computing power for our analysis. With these in place, we can be ready to scale and grow our analyses!



### Questions


1. MapReduce is a framework for big data computing developed to extract and analyze data across a Hadoop Distributed File System (HDFS) cluster.

MapReduce for big data computing is performed simultaneously on each worker node before being sent back to the manager node, allowing for a much faster compute time.


2. Big data may be described by its extremely large volume, which is relative to our modern computing power.

We use velocity to describe big data because it is always growing and outpacing our computing power. Big data comes in a variety of forms, like structured and unstructured.


3. Hadoop Distributed File System (HDFS)  is implemented on a cluster of computers: a cluster manager and several worker nodes to house the data and computing resources together.


4. Which statement most accurately describes big data?

Big data refers to data that is too large to handle with our current computing power and is relative to the system’s total available Random Access Memory (RAM).
Data is “big” when it is bigger than we can handle, and is relative to total RAM size.



## What is Spark?
Learn about Apache Spark and its application for big data analysis
A popular choice is the Hadoop Distributed File System (HDFS), which splits up a dataset and stores it across multiple worker nodes in a cluster.

MapReduce is a computing framework for analyzing datasets housed on a distributed file system like HDFS. MapReduce is a disk-oriented method, which means:

- MapReduce writes data to disk in intermediate steps of analysis. While this method allows us to process data stored on the HDFS, it can still be a slow process for analyzing larger datasets.

PySpark is an API developed to minimize this learning obstacle by allowing programmers to write Python syntax to build Spark applications. There are also APIs for Java and R.

### Apache Spark and PySpark
As big data processing needs have grown, Spark is an analytics engine:

    *   originally developed at UC Berkeley
    *   donated to the open-sourced Apache Software Foundation.
    
Spark was designed as a solution for

    *   processing big datasets
    *   was specifically developed to build data pipelines for machine learning applications.

MapReduce, Spark does not have its own file storage system and is designed to be used with distributed file systems like HDFS. Spark can also be run on a single node (single computer) in stand-alone mode with a non-distributed dataset.

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Apache_Spark_and_PySpark.png)

- Spark uses the RAM of each cluster node in unison, harnessing the power of multiple computers.
- Spark applications execute analyses up to 100 times faster than MapReduce because

    *   Spark caches data and intermediate tables in RAM.
    *   However, as datasets become larger, the advantage of using RAM decreases and can disappear altogether.


### How Spark Works

- The Spark driver is used to create a Spark session
    *   The Spark driver is the entry point of a Spark application.
- The driver program communicates with the cluster manager to create resilient distributed datasets (RDDs).
    *   To create an RDD, the data is divided up and distributed across worker nodes in a cluster.
    *   Copies of the RDD across the nodes ensure that RDDs are fault-tolerant, so information is recoverable in the event of a failure.
- Two types of operations can be performed on RDDs:

  1. Transformations manipulate RDDs on the cluster.
  2. Actions return a computation back to the main driver program.

![How_Spark_Works](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/How_Spark_Works.png)


The cluster manager determines the resources that the Spark application requires and assigns a specific number of worker nodes as executors to handle the processing of RDDs.

Spark can be run on top of cluster managers including:

    *  Hadoop’s YARN
    *  Apache Mesos.

### Spark Modules
The driver program is the core of the Spark application, but there are also modules that have been developed to enhance the utility of Spark. These modules include:

* Spark SQL: an API that converts SQL queries and actions into Spark tasks to be distributed by the cluster manager. This allows for the integration of existing SQL pipelines without redevelopment of code and subsequent testing required for quality control.
* Spark Streaming: a solution for processing live data streams that creates a discretized stream (Dstream) of RDD batches.
* MLlib and ML: machine learning modules for designing pipelines used for feature engineering and algorithm training. ML is the DataFrame-based improvement on the original MLlib module.
* GraphX: a robust graphing solution for Spark. More than just visualizing data, this API converts RDDs to resilient distributed property graphs (RDPGs) which utilize vertex and edge properties for relational data analysis.


### Spark Limitations
Spark is a powerful tool for working with big data, but it does come with some limitations:

Expensive hardware requirements: Spark provides a solution for more time-efficient analyses of large distributed datasets, but Spark analyses are much less cost-effective. The costs associated with Spark come from the need for a lot of RAM built into the worker nodes of a cluster. RAM is much more expensive than disk memory.

**Real-time processing is not possible**: Spark Streaming offers near real-time data processing, but true real-time data analysis is not supported.

**Manual optimization is required**: The benefits and power of Spark must be optimized by the developer, which requires an advanced understanding of the program and backend, creating a technical hurdle for developers.

### Cases
TripAdvisor is an online travel site that sources, generates, and analyzes massive amounts of data per day. All of the data processing for TripAdvisor is done using Spark. Natural language processing of reviews is an example shared by TripAdvisor in an article on their site. https://www.tripadvisor.com/engineering/using-apache-spark-for-massively-parallel-nlp/

MyFitnessPal is a popular application for smartwatches and smartphones owned by Under Armour that tracks the diet and exercise of its users. The app utilizes Spark to analyze user data for their users and internal marketing demographic classification. You can read more about how MyFitnessPal uses Spark in this article by the Wall Street Journal. https://www.wsj.com/articles/BL-CIOB-7254

We can find many other examples of Spark use cases for commercial business and research projects in recent years.
There are other big data analysis solutions that are built using Spark code or using similar concepts. Some examples include:
- Delta Lake
- Apache Mesos
- Rumble (Apache)
- DataBricks



## RDDS WITH PYSPARK

### Start Coding with PySpark

SparkSession is the entry point to Spark. There are many possible configurations for a SparkSession, but for now, we will simply start a new session and save it as spark:


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

We can use Spark with data stored on :
- a distributed file system (DFS or HDFS)
- your local machine (PC)



Without additional configurations, Spark defaults to local with the number of partitions set to the number of CPU cores on our local machine (often, this is 4).

In [None]:
print(spark)
#Output: <pyspark.sql.session.SparkSession object at 0x7f4330b163d0>

Within a SparkSession:

In [None]:
# default setting
rdd_par = spark.sparkContext.parallelize(dataset_name)


- `sparkContext` is the connection to the cluster and gives us the ability to create and transform RDDs.
- `parallelize()` function can create an RDD from data saved locally.

  - We can specify the number of partitions, which is generally recommended as 2-4 partitions per machine.
  - Spark defaults to the total number of CPU cores (4).

In [None]:
 # with partition argument of 10
 rdd_txt = spark.sparkContext.textFile("file_name.txt", 10)

If we are working with an:
- external dataset
- possibly a large dataset stored on a distributed file system,

… we can use `textFile()` to create an RDD.

 Spark’s default is to partition the text file in 128 MB blocks, but we can manually set the number of partitions within the function.




We can verify the number of partitions in rdd_txt using the following line:

In [None]:
dd_txt.getNumPartitions()
# output: 10



Finally, we need to know how to end our SparkSession when we are finished with our work:


In [None]:
spark.stop()

### Transformations


Each function took an RDD as input and returned an RDD as output. In Spark, functions with this behavior are called transformations. You can find more transformations in the official Spark documentation.

**A transformation performed on an RDD will always produce an RDD.**

https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations

- Many Spark functions we use on RDDs ≈ functions used in Python
- Use lambda expressions within RDD functions.

Example:  lambda expression that adds the number 1 to its input.

In [None]:
add_one = lambda x: x+1 # apply x+1 to x
print(add_one(10)) # this will output 11

PySpark functions that we may already be familiar with:

`map()` applies an operation to each element of the RDD, so it’s often constructed with a lambda expression.

This map example adds 1 to each element in our RDD:



In [None]:
rdd = spark.SparkContent.parallelize([1,2,3,4,5])
rdd.map(lambda x: x+1)
# output RDD [2,3,4,5,6]

**RDD contains tuples**, we can map the lambda expression to the elements with a specific index value. The following code maps the lambda expression to just the first element of each tuple but keeps the others in the output:



In [None]:
# input RDD [(1,2,3),(4,5,6),(7,8,9)]
rdd.map(lambda x: (x[0]+1, x[1], x[2]))
# output RDD [(2,2,3),(5,5,6),(8,8,9)]

`filter()` allows us to remove or keep data conditionally. If we want to remove all NULL values in the following RDD, we can use a lambda expression in our filter:



In [None]:
# input RDD [1,2,NULL,4,5]
rdd.filter(lambda x: x is not None)
# output RDD [1,2,4,5]

We have one final note about transformations:

We can only view the contents of an RDD by using a special function like collect(), which will **print the data stored in the RDD**. So to view the new RDD in the previous example, we would run the following:



In [None]:
rdd.filter(lambda x: x is not None).collect()
# Output: [1,2,4,5]

In [None]:
# Question
from pyspark.sql import SparkSession
student_data = [("Chris",1523,0.72,"CA"),  ("Jake", 1555,0.83,"NY"),  ("Cody", 1439,0.92,"CA"),
               ("Lisa",1442,0.81,"FL"), ("Daniel",1600,0.88,"TX"),  ("Kelvin",1382,0.99,"FL"),
               ("Nancy",1442,0.74,"TX"),  ("Pavel",1599,0.82,"NY"), ("Josh",1482,0.78,"CA"),
               ("Cynthia",1582,0.94,"CA")]
spark = SparkSession.builder.getOrCreate()
student_rdd = spark.sparkContext.parallelize(student_data)




In [None]:
rdd_transformation = student_rdd.map(lambda x: (x[0], x[1], int(x[2]*100), x[3]))
rdd_transformation.collect()
"""Output:
[('Chris', 1523, 72, 'CA'), ('Jake', 1555, 83, 'NY'), ('Cody', 1439, 92, 'CA'),
('Lisa', 1442, 81, 'FL'), ('Daniel', 1600, 88, 'TX'), ('Kelvin', 1382, 99, 'FL'),
('Nancy', 1442, 74, 'TX'), ('Pavel', 1599, 82, 'NY'), ('Josh', 1482, 78, 'CA'),
('Cynthia', 1582, 94, 'CA')]
"""




In [None]:
rdd_filtered = rdd_transformation.filter(lambda x: x[2]>80)
rdd_filtered.collect()
"""Output:
[('Jake', 1555, 83, 'NY'), ('Cody', 1439, 92, 'CA'), ('Lisa', 1442, 81, 'FL'),
('Daniel', 1600, 88, 'TX'), ('Kelvin', 1382, 99, 'FL'), ('Pavel', 1599, 82, 'NY'),
('Cynthia', 1582, 94, 'CA')]
"""

### Actions


- `collect()`
- `take(n)`
- `reduce()`

Spark executes transformations only when an action is called to return a value. This delay is why we call Spark transformations lazy.

Spark will queue up the transformations to optimize and reduce overhead once an action is called. Let’s say that we wanted to apply a map and filter to our RDD:

In [None]:
rdd = spark.SparkContent.parallelize([1,2,3,4,5])
rdd.map(lambda x: x+1).filter(lambda x: x>3)
rdd.collect()

Instead of following the order that we called the transformations,
- First - spark might load the values greater than 3 into memory
- Second - perform the map function last
- Third - transformations executed only when the action collect() was called to return the entire contents of the new RDD as a list

This swap will save memory and time because Spark loaded fewer data points and mapped the lambda to fewer elements.

Use `take(n)` to view the first n elements of a large RDD

In [None]:
# input RDD [1,2,3,4,5]
rdd.take(3)
# returned RDD [1, 2, 3]


We can use the action `reduce()` to return fewer elements of our RDD by applying certain operators.
Example, sum all the values in the RDD. We can use reduce() with a lambda to add each element sequentially.



In [None]:
# input RDD [1,2,3,4,5]
rdd.reduce(lambda x,y: x+y)
# returned RDD 15

`reduce()` is powerful because it is used to apply many arbitrary operations to an RDD


In [None]:
# Example:
sum_gpa = rdd_transformation.map(lambda x: x[2]).reduce(lambda x,y: x+y)
# view the sum
sum_gpa
# output: 843

sum_gpa / rdd_transformation.count()
# 84.3


### Associative and Commutative Properties


The `reduce(expression)` function we used previously is a powerful aggregation tool, but  limitations to the operations it can apply to RDDs.

`Expression` must be commutative and associative due to the nature of parallelized computation
- commutative the output is independent of the order in which tasks complete
- associative the output is independent on how the data is grouped

Spark operates in parallel — tasks that have commutative and associative properties allow for parallelization.
- The commutative property allows for all parallel tasks to execute and conclude without waiting for another task to complete.
- The associative property allows Spark to partition and distribute our data to multiple nodes because the result will stay the same no matter how tasks are grouped.

Let’s try to break that down a bit further with math! No matter how you switch up or break down summations, they’ll always have the same result thanks to the commutative and associative properties:

1+2+3+4+5 = (3+4+5)+(1+2) = (4)+(2+5+1)+(3) = 15
However, this is not the case with division:

1÷2÷3÷4÷5 ≠ (3÷4÷5)÷(1÷2) ≠ 4÷(2÷5÷1)÷3


![Big Data Storage](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Associative%20and%20Commutative%20Properties.png)


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() # SparkSession started and end it at the end spark.stop()



With the very handy transformation glom() we can print out how our data is partitioned and the resulting summation and division of the partitions.


In [None]:
data = [1,2,3,4,5]
for i in range(1,5):
   rdd = spark.sparkContext.parallelize(data, i)
   print('partition: ', rdd.glom().collect())
   print('addition: ', rdd.reduce(lambda a,b: a+b))


# partition:  [[1, 2, 3, 4, 5]]
# sum:  15
# partition:  [[1, 2], [3, 4, 5]]
# sum:  15
# partition:  [[1], [2, 3], [4, 5]]
# sum:  15
# partition:  [[1], [2], [3], [4, 5]]
# sum:  15


In [None]:
for i in range(1,5):
   rdd = spark.sparkContext.parallelize(data, i)
   print('partition: ', rdd.glom().collect())
   print('division: ', rdd.reduce(lambda a,b: a/b))

# partition:  [[1, 2, 3, 4, 5]]
# division:  0.008333333333333333
# partition:  [[1, 2], [3, 4, 5]]
# division:  3.3333333333333335
# partition:  [[1], [2, 3], [4, 5]]
# division:  1.875
# partition:  [[1], [2], [3], [4, 5]]
# division:  0.20833333333333331


### Shared variables

#### Broadcast Variables

Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks.
Broadcast variables are cached  - PySpark distributes broadcast variables to the workers using efficient broadcast algorithms to reduce communication costs.(also known as shared variables)
Never want to broadcast large amounts of data because the size would be too much to serialize and send through the network.


In [None]:
# Example
# list of states
states = ['FL', 'NY', 'TX', 'CA', 'NY', 'NY', 'FL', 'TX']
# convert to RDD
states_rdd = spark.sparkContext.parallelize(states)

- We want the region instead of the state, such as “East” or “South”.
- RDD is partitioned in the Spark cluster, and we don’t know which nodes contain data on which states.

We need to send the conversion information to all nodes because it’s very likely that each node will contain multiple distinct states.
We can provide each node with information on which states belong in each region. This information, Spark calls broadcast variables.


Creating a conversion dictionary called region that matches each state to its region:


In [None]:
# dictionary of regions
region = {"NY":"East", "CA":"West", "TX":"South", "FL":"South"}


We can then broadcast our region dictionary and apply the conversion to each element in the RDD with our map function:

In [None]:
# broadcast region dictionary to nodes
broadcast_var = spark.sparkContext.broadcast(region)
# map regions to states
result = states_rdd.map(lambda x: broadcast_var.value[x])
# view first four results
result.take(4)
# output : [‘South’, ‘East’, ‘South’, ‘West’]


This is Spark’s efficient method of sharing variables amongst its nodes (also known as shared variables).

Question:


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
student_data = [("Chris",1523,0.72,"CA"), ("Jake", 1555,0.83,"NY"), ("Cody", 1439,0.92,"CA"),
               ("Lisa",1442,0.81,"FL"), ("Daniel",1600,0.88,"TX"), ("Kelvin",1382,0.99,"FL"),
               ("Nancy",1442,0.74,"TX"), ("Pavel",1599,0.82,"NY"), ("Josh",1482,0.78,"CA"),
               ("Cynthia",1582,0.94,"CA")]
student_rdd = spark.sparkContext.parallelize(student_data)
rdd_transformation = student_rdd.map(lambda x: (x[0], x[1], int(x[2]*100), x[3]))
states = {"NY":"New York", "CA":"California", "TX":"Texas", "FL":"Florida"}



## YOUR SOLUTION HERE ##
broadcastStates = spark.sparkContext.broadcast(states)
# confirm type
type(broadcastStates) # pyspark.broadcast.Broadcast



## YOUR SOLUTION HERE ##
rdd_broadcast = rdd_transformation.map(lambda x: (x[0],x[1],x[2],broadcastStates.value[x[3]]))
# confirm transformation is correct
rdd_broadcast.collect()
"""# [('Chris', 1523, 72, 'California'), ('Jake', 1555, 83, 'New York'), ('Cody', 1439, 92, 'California'),
#  ('Lisa', 1442, 81, 'Florida'), ('Daniel', 1600, 88, 'Texas'),  ('Kelvin', 1382, 99, 'Florida'),
#  ('Nancy', 1442, 74, 'Texas'),  ('Pavel', 1599, 82, 'New York'),  ('Josh', 1482, 78, 'California'),
#  ('Cynthia', 1582, 94, 'California')]"""
spark.stop()

#### Accumulator Variables

Accumulator variables can be updated and are primarily used as counters or sums. Conceptually, they’re similar to the sum and count functions in NumPy.
- They can keep track of the inputs and outputs of each Spark task by aggregating the size of each subsequent transformation.
- monitor for data loss - we could count the number of NULL values or the resulting size of each transformation.
- It’s best to avoid using accumulators in transformations. Whenever Spark runs into an exception, it will re-execute the tasks. This will incorrectly increment the accumulator. However, Spark will guarantee that this does not happen to accumulators in actions.

How many “East” versus “West” entries there are.

- We could attempt to create a couple of variables to keep track of the counts, but we might run into serialization and overhead issues when datasets get really big.
- Let’s see how we can implement accumulator variables by counting the number of distinct regions. Since this will be a new dataset, let’s create an RDD first


In [None]:
region = ['East', 'East', 'West', 'South', 'West', 'East', 'East', 'West', 'North']
rdd = spark.sparkContext.parallelize(region)

We’ll start off by initializing the accumulator variables at zero:

In [None]:
east = spark.sparkContext.accumulator(0)
west = spark.sparkContext.accumulator(0)

Let’s create a function to increment each accumulator by one whenever Spark encounters ‘East’ or ‘West’:


In [None]:
def countCoasts(r):
   if 'East' in r: east.add(1)
   elif 'West' in r: west.add(1)

We’ll take the function we created and run it against each element in the RDD.

In [None]:
rdd.foreach(lambda x: countCoasts(x))
print(east) # output: 4
print(west) # output: 3

In [None]:
# Question:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
student_data = [("Chris",1523,0.72,"CA"),
               ("Jake", 1555,0.83,"NY"),
               ("Cody", 1439,0.92,"CA"),
               ("Lisa",1442,0.81,"FL"),
               ("Daniel",1600,0.88,"TX"),
               ("Kelvin",1382,0.99,"FL"),
               ("Nancy",1442,0.74,"TX"),
               ("Pavel",1599,0.82,"NY"),
               ("Josh",1482,0.78,"CA"),
               ("Cynthia",1582,0.94,"CA")]
student_rdd = spark.sparkContext.parallelize(student_data)
rdd_transformation = student_rdd.map(lambda x: (x[0], x[1], int(x[2]*100), x[3]))
states = {"NY":"New York", "CA":"California", "TX":"Texas", "FL":"Florida"}
broadcastStates = spark.sparkContext.broadcast(states)
rdd_broadcast = rdd_transformation.map(lambda x: (x[0],x[1],x[2],broadcastStates.value[x[3]]))






## YOUR SOLUTION HERE ##
sat_1500 = spark.sparkContext.accumulator(0)


# confirm type
type(sat_1500) # pyspark.accumulators.Accumulator




## YOUR SOLUTION HERE ##
def count_high_sat_score(x):
   if x > 1500 : sat_1500.add(1)


# confirm saved as a function
print(count_high_sat_score) # <function count_high_sat_score at 0x7fde70198280>




## YOUR SOLUTION HERE ##
rdd_broadcast.foreach(lambda x:count_high_sat_score(x[1]))


# confirm accumulator worked
print(sat_1500) # 5


#### Review
Congratulations! You’ve just finished your first coding adventure with PySpark! In this lesson, we learned that:
- RDDs are the foundational data structure of Spark
- RDDs are fault-tolerant, partitioned, and operated on in parallel
- Transformations are lazy and do not execute until an action is called

We also learned how to:
- Transform and summarize RDDs with transformations and actions
- Send information to all nodes with broadcast variables
- Debug work with accumulator variables


![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/RDDS%20WITH%20PYSPARK.png)

Suppose we had 2 Spark RDDs we would like to join. One of these RDDs should be broadcasted to each node before transforming it with `.join()`. Given the following information, which one should be broadcasted?

In [None]:
print(rdd1.count()) # 100 tuples
print(rdd2.count()) # 1,050,000,000 tuples


broadcasted_rdd = broadcast(rdd1)
joined_rdd = rdd2.join(broadcasted_rdd)



What are the three properties of RDDs?
- Resilient
- Distributed
- Parallelized

RDDs are fault-tolerant (resilient), distributed throughout multiple nodes, and are operated on in parallel.

## PYSPARK SQL



### Introducing PySpark SQL

- we may not always want to perform complicated analysis directly on RDDs
- Spark SQL that can make common data analysis tasks simpler and faster.

The name Spark SQL is an umbrella term, as there are several ways to interact with data when using this module



1. Basic inspection and querying data in a Spark DataFrame.
2. Same operations using standard SQL directly in our PySpark code.

* SparkSession -> sparkContext -> entry point to Spark SQL
* Start a SparkSession, is a wrapper around a sparkContext and contains all the metadata required to start working with distributed data.
* `SparkSession.builder` to set configuration parameters and create a new session

In [None]:
spark = SparkSession.builder.config('spark.app.name', 'learning_spark_sql').getOrCreate()


set one configuration parameter `spark.app.name`
 call the `.getOrCreate()` method to initialize the new `SparkSession`

In [None]:
print(spark.sparkContext)
# <SparkContext master=local[*] appName=learning_spark_sql>

We can access the `SparkContext` for a session with `SparkSession.sparkContext`

- Use the SparkSession to create DataFrames, read external files, register tables, and run SQL queries over saved data.
- Terminate the session with SparkSession.stop()
  - finished the analysis
  - clear the Spark cache
- Now that we’re familiar with the basics of SparkSession, the next step is to begin using Spark SQL to interact with data
- DataFrames can be created manually from RDDs using rdd.toDF(`["names", "of", "columns"]`).


In [None]:
# Create an RDD from a list
hrly_views_rdd  = spark.sparkContext.parallelize([
   					["Betty_White" , 288886],
   					["Main_Page", 139564],
   					["New_Year's_Day", 7892],
   					["ABBA", 8154]
 							])
# Convert RDD to DataFrame
hrly_views_df = hrly_views_rdd .toDF(["article_title", "view_count"])

We can use the DataFrame.show(n_rows) method to print the first n_rows of a Spark DataFrame.  truncate=False to ensure all columns are visible.



In [None]:
hrly_views_df.show(4, truncate=False)

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/PySpark%20SQL.png)

Access the underlying RDD with `DataFrame.rdd` , the underlying metadata.
Keep in mind that a DataFrame is a structure built on top of an RDD. When we check the type of hrly_views_df_rdd, we can see that it’s an RDD!


In [None]:
# Access DataFrame's underlying RDD
hrly_views_df_rdd = hrly_views_df.rdd
# Check object type
print(type(hrly_views_df_rdd))
# <class 'pyspark.rdd.RDD'>

### Spark DataFrames from External Sources
how to pull in larger datasets from external sources



In [None]:
print(type(spark.read))
# <class 'pyspark.sql.readwriter.DataFrameReader'>


# Read CSV to DataFrame
hrly_views_df = spark.read.option('header', True) .option('delimiter', ' ') \
 						     .option('inferSchema', True)\
 						      .csv('views_2022_01_01_000000.csv')

There are a few things going on in this code, let’s go through them one at a time:
- This code uses the `SparkSession.read` function to create a new `DataFrameReader`
- The `DataFrameReader` has an `.option`('`option_name`', '`option_value`')method that can be used to instruct Spark how to read a file.

In this case, we used the following options:

- `.option`('`header`', `True`)  — Indicate the file already contains a header row. By default, Spark assumes there is no header.
- `.option`('`delimiter`', '` `')— Indicates each column is separated by a space (‘ ‘). By default, Spark assumes CSV columns are separated by commas.
- `.option`('`inferSchema`', `True`) — Instructs Spark to sample a subset of rows before determining each column’s type. By default, Spark will treat all CSV columns as strings.

The `DataFrameReader` also has a `.csv('path')` method which loads a CSV file and returns the result as a DataFrame.

There are a few quick ways of checking that our data has been read in properly. The most direct way is checking DataFrame.show().


In [None]:
# Display first 5 rows of DataFrame
hrly_views_df.show(5, truncate=False)

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Spark%20DataFrames%20from%20External%20Sources.png)

we used a DataFrameReader to pull a CSV from disk into our local Spark environment. However, Spark can read a wide variety of file formats. You can refer to the PySpark documentation to explore all available DataFrameReader options and file formats. In the following exercise, we’ll start to analyze the contents of this file.

### Inspecting and Cleaning Data With PySpark


how Spark can help with data exploration and data analysis.
Like Pandas, Spark DataFrames offer a series of operations for:
- cleaning data
- inspecting data
- transforming data

All DataFrames have a schema that defines their structure, columns, and datatypes. We can use `DataFrame.printSchema()` to show a DataFrame’s schema.

In [None]:
# Display DataFrame schema
hrly_views_df.printSchema()
"""output:
root
|-- language_code: string (nullable = true)
|-- article_title: string (nullable = true)
|-- hourly_count: intereger (nullable = true)
|-- monthly_count: intereger (nullable = true)
"""

- `DataFrame.describe()` to see a high-level summary of the data by column.
- Result is a DataFrame in itself, so we append `.show()` to get it to display in our notebook.


In [None]:
hrly_views_df_desc = hrly_views_df.describe()
hrly_views_df_desc.show(truncate=False)

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Spark%20DataFrames%20from%20External%20Sources-1.png)

From this summary, we can see.
- About 4.65 million unique pages were visited this hour
- The most visited page had almost 289,000 visitors, while the mean page had just over 4.5 visitors.

`DataFrame.drop("columns", "to", "drop")` can drop this columns by name

In [None]:
# Drop `monthly_count` and display new DataFrame
hrly_views_df = hrly_views_df.drop('monthly_count')
hrly_views_df.show(5)

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Spark%20DataFrames%20from%20External%20Sources-2.png)

`DataFrame.withColumnRenamed('old_name', 'new_name')`    
We can replace this misleading header with a better name.

In [None]:
hrly_views_df = hrly_views_df.withColumnRenamed('article_title', 'page_title')
hrly_views_df.printSchema()

![Apache Spark and PySpark](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Spark%20DataFrames%20from%20External%20Sources-3.png)

- Spark assigned all columns `nullable = true`
- DataFrameReader reads a CSV, it assigns `nullable = true` to all columns.
- Intuitively, we know that `article_title` shouldn’t be null

### Querying PySpark DataFrames

- Performing analysis–with PySpark SQL .
- PySpark SQL DataFrames has built-in methods that can help with analyzing data.

Some methods for analysis:
- `.filter()` method  is similar to SQL “WHERE” clause.
- `.select()` is used to choose which columns to return in our result.
  - `DataFrame.select(["A", "B", "C"])` similar to `SELECT A, B, C FROM DataFrame` in SQL.

- `.orderBy()` is similar to SQL’s ORDER BY in SQL.
  - `.orderBy('hourly_count', ascending=False) `to specify the sort column and order logic.

Examples
To filter our data to pages from a specific Wikipedia `language_code` (e.g., "`kw`.m")

In [None]:
 hrly_views_df.filter(hrly_views_df.language_code == "kw.m").show(truncate=False)


![https://github.com/Hardi-Lore/codeacademy-notebook/tree/main/pictures](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Querying%20PySpark%20DataFrames.png)

- remove the monthly_count column ()
- display the data ordered by the hourly_count


In [None]:
hrly_views_df.filter(hrly_views_df.language_code == "kw.m")\
   		.select(['language_code', 'article_title', 'hourly_count'])\
   		.orderBy('hourly_count', ascending=False)\
   		.show(5, truncate=False)


![https://github.com/Hardi-Lore/codeacademy-notebook/tree/main/pictures](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Querying%20PySpark%20DataFrames-1.png)


- Select the sum of hourly_count by language_code
- Which sites were most active this hour


In [None]:
 hrly_views_df.select(['language_code', 'hourly_count'])\
   		.groupBy('language_code')\
  		.sum() \
   		.orderBy('sum(hourly_count)', ascending=False)\
   		.show(5, truncate=False)

![https://github.com/Hardi-Lore/codeacademy-notebook/tree/main/pictures](https://raw.githubusercontent.com/Hardi-Lore/codeacademy-notebook/main/pictures/Querying%20PySpark%20DataFrames-2.png)


- `.groupBy('language_code').sum()` to calculate the sum of all columns grouped by `language_code`,
- This code also orders our results with `.orderBy`(, using the name of the constructed column, '`sum(hourly_count)`'.

#### Querying PySpark with SQL

SparkSession.sql()method we can analyze data in Spark with standard SQL
- PySpark DataFrame’s query methods are an improvement on performing analysis directly on RDDs.  (requires some practice)

Before querying a DataFrame with SQL in Spark, it must be saved to the SparkSession’s catalog.

In [None]:
hrly_views_df.createOrReplaceTempView('hourly_counts')


- DataFrame as a local temporary view in memory.
- As long as the current SparkSession is active, we can use SparkSession.sql() to query it.

Each of the three sections of SQL below performs the same function as the DataFrame query methods described in the previous exercise.




In [None]:
# Pyspark SQL
query = """SELECT * FROM hourly_counts WHERE language_code = 'kw.m'"""
spark.sql(query).show(truncate=False)

# Pyspark
hrly_views_df.filter(hrly_views_df.language_code == "kw.m")\
                                                                    .show(truncate=False)

# Pyspark SQL
query = """SELECT language_code, article_title, hourly_count
   FROM hourly_counts
   WHERE language_code = 'kw.m'
   ORDER BY hourly_count DESC"""
spark.sql(query).show(truncate=False)

# Pyspark
hrly_views_df.filter(hrly_views_df.language_code == "kw.m")\
           .select(['language_code', 'article_title', 'hourly_count'])\
   		.orderBy('hourly_count', ascending=False)\
   		.show(5, truncate=False)


# Pyspark SQL
query = """SELECT language_code, SUM(hourly_count) as sum_hourly_count
   FROM hourly_counts
   GROUP BY language_code
   ORDER BY sum_hourly_count DESC"""
spark.sql(query).show(5, truncate=False)

# Pyspark
hrly_views_df.select(['language_code', 'hourly_count'])\
   		.groupBy('language_code')\
  		.sum() \
   		.orderBy('sum(hourly_count)', ascending=False)\
   		.show(5, truncate=False)










### Saving PySpark DataFrames


Once you’ve done some analysis, the next step is often saving the transformed data back to disk for others to use. In this final topic, we’re going to cover how to efficiently save PySpark DataFrames.

- Spark offers a `.write.csv()` method. The Dataset is saved to disk.

In [None]:
 hrly_views_df.write.csv('cleaned/csv/views_2022_01_01_000000/', mode="overwrite")

- To write DataFrames to a directory of files rather than a single CSV file.
  - Spark runs all operations in parallel
- `.csv(“path/to/directory”,mode="overwrite").write.parquet( )`
- `.csv()` does not retain information about its format/schema
- `.write.parquet( )` preserves information about a dataset’s schema.

In [None]:
 hrly_views_slim_df .write.parquet('cleaned/parquet/views_2022_01_01_000000/', mode="overwrite")



Question
1.
- PySpark DataFrames must have a defined column and row schema.
- In PySpark, a DataFrame is a distributed and built on top of Spark’s resilient distributed datasets (RDDs).
- PySpark DataFrames are built on top of resilient distributed datasets (RDDs).
- PySpark DataFrames are distributed collections of data.

2. DataFrame.createOrReplaceTempView() can be used to save a DataFrame or query result in memory for future analysis.

3.
- Spark can parallelize operations on DataFrames regardless of the file format they were created from. However, operations on parquet files are often faster than on CSV files.
- Parquet files are efficiently compressed and thus smaller than CSV files containing the same data.
- Parquet files preserve information about a DataFrame’s schema.
- Performing analysis on parquet files is often faster than CSV files.



# let's Learn to impliment Pyspark

## Introducing PySpark SQL

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .master('local[*]')\
    .config('spark.driver.memory', '1g')\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

# The variable `spark` is a `pyspark.sql.session.SparkSession`
type(spark)




In [None]:
# The underlying `SparkContext` can be accessed from the `SparkSession`
type(spark.sparkContext)


In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Creating Spark DataFrames

In [None]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

sample_page_views  = spark.sparkContext.parallelize([
    ["en", "Statue_of_Liberty", "2022-01-01", 263],
    ["en", "Replicas_of_the_Statue_of_Liberty", "2022-01-01", 11],
    ["en", "Statue_of_Lucille_Ball" ,"2022-01-01", 6],
    ["en", "Statue_of_Liberty_National_Monument", "2022-01-01", 4],
    ["en", "Statue_of_Liberty_play"  ,"2022-01-01", 3],
])

1. Create a DataFrame from `sample_page_views`.

In [None]:
## YOUR SOLUTION HERE ##
sample_page_views_df = sample_page_views.toDF(["language_code", "title", "date", "count"])

# show first 5 rows
sample_page_views_df.show(5, truncate=False)

2. Access the RDD underlying `sample_page_views_df`.

In [None]:
## YOUR SOLUTION HERE ##
sample_page_views_rdd_restored = sample_page_views_df.rdd

# show restored RDD
sample_page_views_rdd_restored.collect()

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Spark DataFrames from External Sources

In [None]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

1. Read in the CSV file without any specific read options specified and show the top 10 rows.

In [None]:
## YOUR SOLUTION HERE ##
wiki_uniq_df = spark.read.csv('wiki_uniq_march_2022.csv')

# show the first 10 rows
wiki_uniq_df.show(10, truncate = False)

2. Read in the CSV file with an option to treat the first row as a header and show the top 10 rows.

In [None]:
## YOUR SOLUTION HERE ##
wiki_uniq_w_header_df = spark.read.option('header',True).csv('wiki_uniq_march_2022.csv')

# show the first 10 rows
wiki_uniq_w_header_df.show(10, truncate = False)

3. Check the data types of `wiki_uniq_w_header_df`.

In [None]:
# show the data types
wiki_uniq_w_header_df.dtypes

4. Read in the CSV file with an option to treat the first row as a header and infer the schema. Then check the data types of `wiki_uniq_w_schema_df`.

In [None]:
## YOUR SOLUTION HERE ##
wiki_uniq_w_schema_df = spark.read.option('header',True).option('inferSchema', True).csv('wiki_uniq_march_2022.csv')

# show the data types
wiki_uniq_w_schema_df.dtypes

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Inspecting and Cleaning Data With PySpark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# read in the Wikipedia unique visitors dataset
uniq_views_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022.csv")

1. Print the DataFrame schema for `uniq_views_df`.

In [None]:
## YOUR SOLUTION HERE ##
uniq_views_df.printSchema()

2. Show a description of the data for `uniq_views_df`.

In [None]:
## YOUR SOLUTION HERE ##
uniq_views_df_desc = uniq_views_df.describe()

# show summary
uniq_views_df_desc.show()

3. Drop the columns `total_visitor_count` and `uniq_bot_visitors`.

In [None]:
## YOUR SOLUTION HERE ##
uniq_counts_human_df = uniq_views_df.drop('total_visitor_count','uniq_bot_visitors')

# show the first 5 rows
uniq_counts_human_df.show(5)

4. Rename `uniq_human_visitors` to `unique_site_visitors`.

In [None]:
## YOUR SOLUTION HERE ##
uniq_counts_final_df = uniq_counts_human_df.withColumnRenamed('uniq_human_visitors', 'unique_site_visitors')

# show the first 5 rows
uniq_counts_final_df.show(5)

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Querying PySpark DataFrames

In [None]:
from pyspark.sql import SparkSession

# Create a New SparkSession
spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022_w_site_type.csv")

1. Filter the DataFrame to sites with `language_code` is `"ar"`.

In [None]:
## YOUR SOLUTION HERE ##
ar_site_visitors = wiki_uniq_df\
    .filter(wiki_uniq_df.language_code == 'ar')

# show the DataFrame
ar_site_visitors.show()

2. Filter the DataFrame to sites with `language_code` is `"ar"` and keep only the columns `domain` and `uniq_human_visitors`.

In [None]:
## YOUR SOLUTION HERE ##
ar_visitors_slim = wiki_uniq_df\
    .select(['domain', 'uniq_human_visitors'])\
    .filter(wiki_uniq_df.language_code == 'ar')

# show the DataFrame
ar_visitors_slim.show()

3. Calculate the sum of all `uniq_human_visitors` grouped by `site_type` and ordered from highest to lowest page views.

In [None]:
## YOUR SOLUTION HERE ##
top_visitors_site_type = wiki_uniq_df.select(['site_type', 'uniq_human_visitors'])\
    .groupBy('site_type')\
    .sum()\
    .orderBy('sum(uniq_human_visitors)', ascending=False)

# show the DataFrame
top_visitors_site_type.show()

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Querying PySpark with SQL

In [None]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession \
    .builder \
    .appName("learning_spark_sql") \
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022_w_site_type.csv")

# Create a temporary view with the DataFrame
wiki_uniq_df\
    .createOrReplaceTempView('uniq_visitors_march')

1. Filter the DataFrame to sites where `language_code` is `"ar"`.

In [None]:
## YOUR SOLUTION HERE ##
ar_site_visitors_qry = """
    SELECT * FROM uniq_visitors_march
    WHERE language_code = 'ar';
"""

# show the DataFrame
spark\
    .sql(ar_site_visitors_qry)\
    .show(truncate=False)

2. Filter the DataFrame to sites with `language_code` is `"ar"` and keep only the columns `domain` and `uniq_human_visitors`.

In [None]:
## YOUR SOLUTION HERE ##
ar_site_visitors_slim_qry = """
    SELECT domain, uniq_human_visitors
    FROM uniq_visitors_march
    WHERE language_code = 'ar';
"""

# show the DataFrame
spark\
    .sql(ar_site_visitors_slim_qry)\
    .show(truncate=False)

3. Calculate the sum of all uniq_human_visitors grouped by site_type and ordered from highest to lowest uniq_human_visitors.

In [None]:
## YOUR SOLUTION HERE ##
site_top_type_qry = """
    SELECT site_type, SUM(uniq_human_visitors)
    FROM uniq_visitors_march
    GROUP BY site_type
    ORDER BY SUM(uniq_human_visitors) DESC;
"""

# show the DataFrame
spark\
    .sql(site_top_type_qry)\
    .show(truncate=False)

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Saving PySpark DataFrames

In [None]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022.csv")

1. Run the code to create a new DataFrame with only `domain` and `uniq_human_visitors`.

In [None]:
# select only domain and uniq_human visitors
uniq_human_visitors_df = wiki_uniq_df\
    .select('domain', 'uniq_human_visitors')

# show the new DataFrame
uniq_human_visitors_df.show()

2. Save the new DataFrame as CSV files.

In [None]:
## YOUR SOLUTION HERE ##
uniq_human_visitors_df.write.csv("./results/csv/uniq_human_visitors/"
                                 , mode="overwrite")


3. Save the new DataFrame as Parquet files.

In [None]:
## YOUR SOLUTION HERE ##
uniq_human_visitors_df.write.parquet("./results/pq/uniq_human_visitors/"
                                     , mode="overwrite")


In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

## Implimentation

In [None]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read\
    .option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv("wiki_uniq_march_2022_w_site_type.csv")

In [None]:
# Clear the SparkSession cache and delete the underlying `sparkContext`
spark.stop()

In [None]:
# Create a new DataFrame named `internal_clickstream`
internal_clickstream = clickstream.select(["source_page", "target_page", "click_count"])\
                                  .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(5, truncate=False)


internal_clickstream = clickstream\
    .select(["source_page", "target_page", "click_count"])\
    .filter(clickstream.link_category == 'link')

# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

# Pyspark project


 ## Analyzing Wikipedia Clickstream Data

In [None]:
from pyspark.sql import SparkSession

In [None]:
# Create a new SparkSession and assign it to a variable named spark .
spark = SparkSession \
 .builder \
 .getOrCreate()


In [None]:
"""Create an RDD from a list of sample clickstream counts and save it as
clickstream_counts_rdd .
"""
sample_clickstream_counts = [
 ["other-search", "Hanging_Gardens_of_Babylon", "external", 47000],  ["other-empty", "Hanging_Gardens_of_Babylon", "external", 34600],  ["Wonders_of_the_World", "Hanging_Gardens_of_Babylon", "link", 14000],  ["Babylon", "Hanging_Gardens_of_Babylon", "link", 2500] ]
clickstream_counts_rdd = spark.sparkContext.parallelize(
 sample_clickstream_counts
)


In [None]:
# Create a DataFrame from the RDD of sample clickstream counts
clickstream_sample_df = clickstream_counts_rdd\
 .toDF(["source_page", "target_page", "link_category", "link_count"])
# Display the DataFrame to the notebook
clickstream_sample_df.show(5, truncate=False)


Output:


```

source_page         |target_page               |link_category |link_ count
--------------------+--------------------------+--------------+------------
other-search        |Hanging_Gardens_of_Babylon|external      |47000
--------------------+--------------------------+--------------+------------
other-empty         |Hanging_Gardens_of_Babylon|external      |34600
--------------------+--------------------------+--------------+------------
Wonders_of_the_World|Hanging_Gardens_of_Babylon|link          |14000
--------------------+--------------------------+--------------+------------
Babylon             |Hanging_Gardens_of_Babylon|link          |2500
```

In [None]:
# Read the target directory (`./cleaned/clickstream/`) into a DataFrame (`clicks tream`)
clickstream = spark.read \
 .option('header', True) \
 .option('delimiter', '\t') \
 .option('inferSchema', True) \
 .csv("./cleaned/clickstream/")
# Display the DataFrame to the notebook
clickstream.show(5, truncate=False)

output:

only showing top 5 rows
```
|referrer           |resource      |link_category|language_code|click_count|
+-------------------+--------------+-------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |en           |43190      |
+-------------------+--------------+-------------+-------------+-----------+
|other-internal     |Phantom_Thread|external     |en           |21683      |
+-------------------+--------------+-------------+-------------+-----------+
|other-empty        |Phantom_Thread|external     |en           |169532     |
+-------------------+--------------+-------------+-------------+-----------+
|90th_Academy_Awards|Phantom_Thread|link         |en           |40449      |
+-------------------+--------------+-------------+-------------+-----------+
|other-search       |Phantom_Thread|external     |en           |536940     |
+-------------------+--------------+-------------+-------------+-----------+
```



In [None]:
# Display the schema of the `clickstream` DataFrame to the notebook
clickstream.printSchema()

Output:

```
root
|-- referrer: string (nullable = true)
|-- resource: string (nullable = true)
|-- link_category: string (nullable = true)
|-- language_code: string (nullable = true)
|-- click_count: integer (nullable = true)
```



In [None]:
# Drop language_code columns
clickstream = clickstream.drop("language_code")

""" Display the first few rows of the DataFrame and the new schema in the
notebook clickstream.show(5, truncate=False)
"""
clickstream.printSchema()

Output:


only showing top 5 rows
```
|referrer           |resource      |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
+-------------------+--------------+-------------+-----------+
|other-internal     |Phantom_Thread|external     |21683      |
+-------------------+--------------+-------------+-----------+
|other-empty        |Phantom_Thread|external     |169532     |
+-------------------+--------------+-------------+-----------+
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
+-------------------+--------------+-------------+-----------+
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
```



```
root
|-- referrer: string (nullable = true)
|-- resource: string (nullable = true)
|-- link_category: string (nullable = true)
|-- click_count: integer (nullable = true)
```



In [None]:
# Rename `referrer` and `resource` to `source_page` and `target_page` clickstream = clickstream\
 .withColumnRenamed("referrer", "source_page")\
 .withColumnRenamed("resource", "target_page")
# Display the first few rows of the DataFrame and the new schema in the notebook clickstream.show(5, truncate=False)
clickstream.printSchema()



Output:
```
+-------------------+--------------+-------------+-----------+
|source_page        |target_page   |link_category|click_count|
+-------------------+--------------+-------------+-----------+
|Daniel_Day-Lewis   |Phantom_Thread|link         |43190      |
|other-internal     |Phantom_Thread|external     |21683      |
|other-empty        |Phantom_Thread|external     |169532     |
|90th_Academy_Awards|Phantom_Thread|link         |40449      |
|other-search       |Phantom_Thread|external     |536940     |
+-------------------+--------------+-------------+-----------+
```
only showing top 5 rows



```
root
|-- source_page: string (nullable = true)
|-- target_page: string (nullable = true)
|-- link_category: string (nullable = true)
|-- click_count: integer (nullable = true)
```


In [None]:
"""Create a temporary view in the metadata for this `SparkSession` to make the
data queryable with `sparkSession.sql()`
"""
clickstream.createOrReplaceTempView("clickstream")

In [None]:
"""Filter the dataset to entries with Hanging_Gardens_of_Babylon as the
target_page and order the result by click_count using PySpark DataFrame methods.
"""

clickstream\
 .filter(clickstream.target_page == 'Hanging_Gardens_of_Babylon')\
 .orderBy('click_count', ascending=False)\
 .show(10, truncate=False)



Output:


```
+----------------------------------+--------------------------+--------------+-----------+
|source_page                       |target_page               |link_category |click_count|
+----------------------------------+--------------------------+--------------+-----------+
|other-search                      |Hanging_Gardens_of_Babylon|external      |47088      |
|other-empty                       |Hanging_Gardens_of_Babylon|external      |34619      |
|Wonders_of_the_World              |Hanging_Gardens_of_Babylon|link          |14668      |
|Seven_Wonders_of_the_Ancient_World|Hanging_Gardens_of_Babylon|link          |12296      |
+----------------------------------+--------------------------+--------------+-----------+
```



In [None]:
# Perform the same analysis as the previous exercise using a SQL query.
# Filter and sort the DataFrame using SQL
spark.sql(
 """
 SELECT *
 FROM clickstream
 WHERE target_page = 'Hanging_Gardens_of_Babylon'
 ORDER BY click_count DESC
 """
).show(10, truncate=False)

Output:


```
+-----------------------------------+---------------------------+---------------+------------+
|source_page                       |target_page                 |link_category  |click_count |
+-----------------------------------+---------------------------+---------------+------------+
|other-search                       |Hanging_Gardens_of_Babylon |external       |47088       |
|other-empty                        |Hanging_Gardens_of_Babylon |external       |34619       |
|Wonders_of_the_World               |Hanging_Gardens_of_Babylon |link           |14668       |
|Seven_Wonders_of_the_Ancient_World |Hanging_Gardens_of_Babylon |link           | 12296      |
+-----------------------------------+---------------------------+---------------+------------+
```



In [None]:
# Calculate the sum of click_count grouped by link_category using PySpark DataFrame methods.
# Aggregate the DataFrame using PySpark DataFrame Methods
clickstream\
 .groupBy('link_category')\
 .sum()\
 .show(truncate=False)



Output:


```
+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|link         |97805811        |
|other        |9338172         |
|external     |3248677856      |
+-------------+----------------+
```




In [None]:
# Perform the same analysis as the previous exercise using a SQL query.
# Aggregate the DataFrame using SQL
spark.sql(
 """
 SELECT link_category, SUM(click_count) FROM clickstream  GROUP BY link_category
 """
).show(truncate=False)



Output:


```
+-------------+----------------+
|link_category|sum(click_count)|
+-------------+----------------+
|link         |97805811        |
|other        |9338172         |
|external     |3248677856      |
+-------------+----------------+
```

In [None]:
"""Let's create a new DataFrame named internal_clickstream that only contains
article pairs where link_category is link .
"""

# Create a new DataFrame (named `internal_clickstream`) using `filter` to select rows to
# a specific condition and `select` to choose which columns to return from the query.
internal_clickstream = clickstream\
 .select(["source_page", "target_page", "click_count"])\
 .filter(clickstream.link_category == 'link')
# Display the first few rows of the DataFrame in the notebook
internal_clickstream.show(truncate=False)

Output:


```
+----------------------------+----------------------------+-----------+
|source_page                 |target_page                 |click_count|
+----------------------------+----------------------------+-----------+
|Daniel_Day-Lewis            |Phantom_Thread              |43190      |
|90th_Academy_Awards         |Phantom_Thread              |40449      |
|Shinee                      |Kim_Jong-hyun_(singer)      |24433      |
|Agnyaathavaasi              |Anu_Emmanuel                |15020      |
|Naa_Peru_Surya              |Anu_Emmanuel                |12361      |
|Mariah_Carey                |Nick_Cannon                 |16214      |
|Kesha                       |Rainbow_(Kesha_album)       |11448      |
|David_Attenborough          |John_Attenborough           |11252      |
|Boney_M.                    |Bobby_Farrell               |14095      |
|The_End_of_the_F***ing_World|Jessica_Barden              |237279     |
|Quentin_Tarantino           |The_Hateful_Eight           |12018      |
|Ready_Player_One_(film)     |Olivia_Cooke                |17468      |
|Royal_Rumble_(2018)         |Kevin_Owens_and_Sami_Zayn   |11503      |
|Macaulay_Culkin             |Brenda_Song                 |20477      |  
|Altered_Carbon              |Altered_Carbon_(TV_series)  |23962      |
|Lil_Pump                    |Smokepurpp                  |36736      |
|Fifth_Harmony               |Camila_Cabello              |30959      |
|Havana_(Camila_Cabello_song)|Camila_Cabello              |12803      |
|Jennifer_Aniston            |John_Aniston                |26498      |
|Kingsman:_The_Golden_Circle |Kingsman:_The_Secret_Service|11969      |
+----------------------------+----------------------------+-----------+
only showing top 20 rows

```



In [None]:
# Save the `internal_clickstream` DataFrame to a series of CSV files in `./results/article_links_csv/`
# with `DataFrame.write.csv()`
internal_clickstream\
 .write\
 .csv('./results/article_links_csv/', mode="overwrite")

In [None]:

# Save the `internal_clickstream` DataFrame to a series of parquet files in `./results/article_links_parquet/`
# with `DataFrame.write.parquet()`
internal_clickstream\
 .write\
 .parquet('./results/article_links_parquet/', mode="overwrite")

In [None]:
# Close the SparkSession and underlying sparkContext . What happens if you we call clickstream.show() after closing the SparkSession ?

# Stop the notebook's `SparkSession` and `SparkContext`
spark.stop()

# The SparkSession and sparkContext are stopped; the following line will throw  # an error:
clickstream.show()