<a href="https://colab.research.google.com/github/MikeHankinson/Amazon_Vine_Analysis/blob/main/Spark_DataFrames_Functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Install PySpark** 
PySPark does not come native to Google Colab

---





In [2]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()



0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [2 0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 https://developer.download.nvidia.com/comp

**16.4.2 Start a Spark Session:**

---



In [3]:
# Start Spark session 16.4.2
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").getOrCreate()


# **16.4.2 DataFrames: Spark**

16.4.2 Create a **df from scratch**:

---



In [4]:
# creatDataFrame and show (like head in Pandas)
dataframe = spark.createDataFrame([(0, "Here is our DataFrame"),
                                  (1, "We are making one from scratch"),
                                  (2,"This is very similar to a Pandas DataFrame")],
                                 ["ID", "Words"]
                                 )

dataframe.show()


+---+--------------------+
| ID|               Words|
+---+--------------------+
|  0|Here is our DataF...|
|  1|We are making one...|
|  2|This is very simi...|
+---+--------------------+



16.4.2 **Create df** by file import from Amazon's **Simple Storage Servce (S3)**:

---



In [5]:
# 1. Read in data from S3 Buckets 
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/dataviz-curriculum/day_1/food.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("food.csv"), sep=",", header=True)

df.show()

# 2. Print our schema
df.printSchema()

#3. Describe the Data
df.describe()



+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+

root
 |-- food: string (nullable = true)
 |-- price: string (nullable = true)



DataFrame[summary: string, food: string, price: string]

In [6]:
# Show the columns
df.columns

['food', 'price']

16.4.2 Create the **schema** by:

---



1.Importing Structure Fields

2.creating a **StructType**, which is one of Spark's complex types, 
like an array or map. 
The **StructField** will 


> a. define the column name

> b. the data type held, and

> c. a Boolean to define 
whether null values will be included or not

3.Pass the created schema as fields in a StructType and store in variable ***final***.

4.we can read in the data again, only this time passing in our own schema.

In [7]:
# 1. Import struct fields that we can use 16.4.2
from pyspark.sql.types import StructField, StringType, IntegerType, StructType


# 2. Next we need to create the list of struct fields
schema = [StructField("food", StringType(), True), StructField("price", IntegerType(), True),]
schema

[StructField(food,StringType,true), StructField(price,IntegerType,true)]

In [8]:
# 3. Pass in our fields
final = StructType(fields=schema)
final

StructType(List(StructField(food,StringType,true),StructField(price,IntegerType,true)))

In [9]:
# 4. Read our data with our new schema
dataframe = spark.read.csv(SparkFiles.get("food.csv"), schema=final, sep=",", header=True)
dataframe.printSchema()

root
 |-- food: string (nullable = true)
 |-- price: integer (nullable = true)



16.4.2 Methods for **Accessing DataFrames in Spark**:

---




In [10]:
#Notice price column is not an integer
dataframe.describe

<bound method DataFrame.describe of DataFrame[food: string, price: int]>

In [11]:
dataframe['price']

Column<b'price'>

In [12]:
type(dataframe['price'])

pyspark.sql.column.Column

In [13]:
dataframe.select('price')

DataFrame[price: int]

In [14]:
type(dataframe.select('price'))

pyspark.sql.dataframe.DataFrame

In [15]:
dataframe.select('price').show()

+-----+
|price|
+-----+
|    0|
|   12|
|   10|
+-----+



In [16]:
#Show() is like head() or tail() in Pandas
#Notice, show() is an action, so we get results
#select() is a transformation, so no results shown.
dataframe.show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+



16.4.2 Manipulate Columns in Spark

---



In [17]:
# Add new column
dataframe.withColumn('newprice', dataframe['price']).show()

+-------+-----+--------+
|   food|price|newprice|
+-------+-----+--------+
|  pizza|    0|       0|
|  sushi|   12|      12|
|chinese|   10|      10|
+-------+-----+--------+



In [18]:
# Update column name
dataframe.withColumnRenamed('price','newerprice').show()

+-------+----------+
|   food|newerprice|
+-------+----------+
|  pizza|         0|
|  sushi|        12|
|chinese|        10|
+-------+----------+



In [19]:
# Double the price
dataframe.withColumn('doubleprice',dataframe['price']*2).show()

+-------+-----+-----------+
|   food|price|doubleprice|
+-------+-----+-----------+
|  pizza|    0|          0|
|  sushi|   12|         24|
|chinese|   10|         20|
+-------+-----+-----------+



In [20]:
# Add a dollar to the price
dataframe.withColumn('add_one_dollar',dataframe['price']+1).show()

+-------+-----+--------------+
|   food|price|add_one_dollar|
+-------+-----+--------------+
|  pizza|    0|             1|
|  sushi|   12|            13|
|chinese|   10|            11|
+-------+-----+--------------+



In [21]:
# Half the price
dataframe.withColumn('half_price',dataframe['price']/2).show()

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



In [22]:
df = dataframe
df.show()
df = df.withColumn('half_price',dataframe['price']/2).show()

+-------+-----+
|   food|price|
+-------+-----+
|  pizza|    0|
|  sushi|   12|
|chinese|   10|
+-------+-----+

+-------+-----+----------+
|   food|price|half_price|
+-------+-----+----------+
|  pizza|    0|       0.0|
|  sushi|   12|       6.0|
|chinese|   10|       5.0|
+-------+-----+----------+



# **16.4.3 Functions: Spark and Python**

---



**Create df** by file import from Amazon's **Simple Storage Servce (S3)**:

---



In [23]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://s3.amazonaws.com/dataviz-curriculum/day_1/wine.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("wine.csv"), sep=",", header=True)

# Show DataFrame
df.show()

+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|country|         description|         designation|points|price|          province|            region_1|         region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+------------------+--------------------+-----------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|        California|         Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|  110|    Northern Spain|                Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|        California|      Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|     US|This spent 20

**Transformations:** are instructions for computation

---



In [24]:
# Order a DataFrame by ascending values 
df.orderBy(df["points"].desc())

#...since its a transformation, nothing happens yet, at this point

DataFrame[country: string, description: string, designation: string, points: string, price: string, province: string, region_1: string, region_2: string, variety: string, winery: string]

**Actions:** direct Spark to perform the computation instructions and return the result

---



In [25]:
# Add show() to the transformation dataframe above
df.orderBy(df["points"].desc()).show(5)

+-------+--------------------+--------------------+------+-----+----------+-----------+--------+--------------------+--------------------+
|country|         description|         designation|points|price|  province|   region_1|region_2|             variety|              winery|
+-------+--------------------+--------------------+------+-----+----------+-----------+--------+--------------------+--------------------+
|     US|This is an absolu...|           IX Estate|    99|  290|California|Napa Valley|    Napa|           Red Blend|              Colgin|
| France|98-100 Barrel sam...|       Barrel sample|    99| null|  Bordeaux|   Pauillac|    null|Bordeaux-style Re...|Ch̢teau Pontet-Canet|
|     US|There are incredi...|Elevation 1147 Es...|    99|  150|California|Napa Valley|    Napa|  Cabernet Sauvignon|        David Arthur|
| France|A magnificent Cha...|Dom P̩rignon Oeno...|    99|  385| Champagne|  Champagne|    null|     Champagne Blend|     Mo��t & Chandon|
|  Italy|Even better than .

Saprk Functions:

---



In [26]:
# Import functions
from pyspark.sql.functions import avg
df.select(avg("points")).show()


# transformation: avg(), select()
# action: show()

+-----------------+
|      avg(points)|
+-----------------+
|87.88834105383143|
+-----------------+



In [27]:
# Filter by Price on certain columns
df.filter("price<20").select(['points', 'country', 'winery', 'price']).show(5)

# -- Note the <20 is also encapsulated in quotes...weird

# transformation: filter(), select()
# action: show()

+------+--------+--------------------+-----+
|points| country|              winery|price|
+------+--------+--------------------+-----+
|    90|Bulgaria|        Villa Melnik|   15|
|    90|   Spain|      Don Bernardino|   17|
|    90|      US|            De Loach|   18|
|    91|      US|   Trinity Vineyards|   19|
|    91|Portugal|Adega Cooperativa...|   15|
+------+--------+--------------------+-----+
only showing top 5 rows



Same Functions in Python:

---



In [28]:
# Filter
df.filter("price<20").show(5)

# Filter by price on certain columns
df.filter("price<20").select(['points','country', 'winery','price']).show(5)

# Filter on exact state
df.filter(df["country"] == "US").show(5)

+--------+--------------------+-----------+------+-----+----------+--------------------+-----------------+--------------+--------------------+
| country|         description|designation|points|price|  province|            region_1|         region_2|       variety|              winery|
+--------+--------------------+-----------+------+-----+----------+--------------------+-----------------+--------------+--------------------+
|Bulgaria|This Bulgarian Ma...|    Bergul̩|    90|   15|  Bulgaria|                null|             null|        Mavrud|        Villa Melnik|
|   Spain|Earthy plum and c...|     Amandi|    90|   17|   Galicia|       Ribeira Sacra|             null|       Menc�_a|      Don Bernardino|
|      US|There's a lot to ...|       null|    90|   18|California|Russian River Valley|           Sonoma|    Chardonnay|            De Loach|
|      US|Massively fruity,...|       null|    91|   19|    Oregon|   Willamette Valley|Willamette Valley|    Pinot Gris|   Trinity Vineyards|

**Sill Drill:**  

---

Using both the SQL and Python context, use filtering to find the rows that contain a bottle of wine over $15 and that comes from California.



In [29]:
# Filter by price and country
cond1 = df["province"] == "California"
cond2 = df["price"] > 15
df.where(cond1 & cond2).show()
# & df["price"] < "15"

+-------+--------------------+--------------------+------+-----+----------+--------------------+-------------+------------------+--------------------+
|country|         description|         designation|points|price|  province|            region_1|     region_2|           variety|              winery|
+-------+--------------------+--------------------+------+-----+----------+--------------------+-------------+------------------+--------------------+
|     US|This tremendous 1...|   Martha's Vineyard|    96|  235|California|         Napa Valley|         Napa|Cabernet Sauvignon|               Heitz|
|     US|Mac Watson honors...|Special Selected ...|    96|   90|California|      Knights Valley|       Sonoma|   Sauvignon Blanc|            Macauley|
|     US|The producer sour...|Gap's Crown Vineyard|    95|   60|California|        Sonoma Coast|       Sonoma|        Pinot Noir|           Blue Farm|
|     US|This blockbuster,...|     Rainin Vineyard|    95|  325|California|Diamond Mountain ..