In [1]:
!pwd

/Users/felipegomez/GitHub/PythonPractice


In [2]:
# Set the PySpark environment variables
import os
# os.environ['SPARK_HOME'] = "/Users/felipegomez/Spark"
# os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
# os.environ['PYSPARK_PYTHON'] = 'python'

In [3]:
# Import Spark
from pyspark.sql import SparkSession

In [4]:
# Create A SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Get-Started") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/17 01:03:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Test The Setup
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

                                                                                

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [6]:
sc = spark.sparkContext

In [7]:
sc

In [8]:
sc.stop() # or spark.stop()

In [9]:
# OLD WAY OF STARTING A SPARKCONTEXT SPARK 1.X

from pyspark import SparkContext

In [10]:
sc = SparkContext(appName = "MySparkApplication")

In [11]:
sc # or just use 'spark'

In [12]:
sc.stop()

In [13]:
# SPARK RDD AND RDD OPERATIONS

In [40]:
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

In [15]:
# HOW TO CREATE RDDs

In [16]:
numbers = [1,2,3,4,5]
rdd = spark.sparkContext.parallelize(numbers)

In [17]:
rdd.collect()

[1, 2, 3, 4, 5]

In [18]:
data = [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]

In [19]:
rdd = spark.sparkContext.parallelize(data)

In [20]:
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


In [30]:
# RDD OPERATIONS: ACTIONS

In [21]:
count = rdd.count()
print("The total number of elements in rdd: ", count)

[Stage 2:>                                                        (0 + 12) / 12]

The total number of elements in rdd:  4


                                                                                

In [41]:
sc = spark.sparkContext
sc

In [42]:
spark

In [27]:
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [28]:
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]


In [29]:
rdd.foreach(lambda x: print(x))

('Alice', 25)
('Bob', 30)
('Charlie', 35)
('Alice', 40)


In [31]:
# RDD OPERATIONS: TRANSFORMATIONS

In [33]:
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))
result = mapped_rdd.collect()
print("rdd with uppercase name: ", result)

rdd with uppercase name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


In [34]:
filtered_rdd = rdd.filter(lambda x: x[1] > 30)
filtered_rdd.collect()

[('Charlie', 35), ('Alice', 40)]

In [35]:
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
reduced_rdd.collect()

                                                                                

[('Alice', 65), ('Bob', 30), ('Charlie', 35)]

In [36]:
sorted_rdd = rdd.sortBy(lambda x: x[1], ascending = False)
sorted_rdd.collect()

[('Alice', 40), ('Charlie', 35), ('Bob', 30), ('Alice', 25)]

In [37]:
# READ/WRITE RDDs FROM/TO TEXT FILE

In [38]:
rdd.saveAsTextFile("output.txt")

In [43]:
rdd_text = spark.sparkContext.textFile('output.txt')
rdd_text.collect()

["('Alice', 40)", "('Bob', 30)", "('Alice', 25)", "('Charlie', 35)"]

In [44]:
spark.stop()

In [45]:
# SPARK DATAFRAMES

In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

In [60]:
spark = SparkSession.builder.appName("DataFrame-Demo").getOrCreate()

In [61]:
# USING RDDs

In [62]:
rdd = spark.sparkContext.textFile("data.txt")

In [63]:
result_rdd = rdd.flatMap(lambda line: line.split(" ")) \
 .map(lambda word: (word, 1))\
 .reduceByKey(lambda a, b: a + b)\
 .sortBy(lambda x: x[1], ascending = False)

In [64]:
result_rdd.take(1)

[('the', 12)]

In [65]:
rdd

data.txt MapPartitionsRDD[118] at textFile at NativeMethodAccessorImpl.java:0

In [66]:
result_rdd.collect()

[('the', 12),
 ('of', 7),
 ('a', 7),
 ('distributed', 5),
 ('in', 5),
 ('Spark', 4),
 ('as', 3),
 ('is', 3),
 ('API', 3),
 ('on', 3),
 ('Dataset', 3),
 ('RDD', 3),
 ('and', 2),
 ('results', 2),
 ('its', 2),
 ('data', 2),
 ('cluster', 2),
 ('that', 2),
 ('The', 2),
 ('was', 2),
 ('API.', 2),
 ('RDDs', 2),
 ('MapReduce', 2),
 ('programs', 2),
 ('function', 2),
 ('resilient', 1),
 ('dataset', 1),
 ('read-only', 1),
 ('multiset', 1),
 ('machines,', 1),
 ('Dataframe', 1),
 ('released', 1),
 ('an', 1),
 ('by', 1),
 ('1.x,', 1),
 ('primary', 1),
 ('interface', 1),
 ('(API),', 1),
 ('but', 1),
 ('use', 1),
 ('encouraged', 1),
 ('even', 1),
 ('though', 1),
 ('deprecated.', 1),
 ('technology', 1),
 ('still', 1),
 ('', 1),
 ('were', 1),
 ('response', 1),
 ('to', 1),
 ('limitations', 1),
 ('computing', 1),
 ('paradigm,', 1),
 ('forces', 1),
 ('structure', 1),
 ('programs:', 1),
 ('read', 1),
 ('input', 1),
 ('from', 1),
 ('disk,', 1),
 ('map', 1),
 ('data,', 1),
 ('map,', 1),
 ('disk.', 1),
 ("Spa

In [67]:
# USING DATAFRAMES

In [68]:
df = spark.read.text("data.txt")

In [69]:
result_df = df.selectExpr("explode(split(value, '')) as word").groupBy("word").count().orderBy(desc("count"))

In [70]:
result_df.take(10)

[Row(word=' ', count=160),
 Row(word='e', count=84),
 Row(word='t', count=83),
 Row(word='a', count=80),
 Row(word='r', count=61),
 Row(word='s', count=55),
 Row(word='i', count=54),
 Row(word='o', count=47),
 Row(word='n', count=40),
 Row(word='d', count=39)]

In [71]:
# DATAFRAME CODE IS MORE CONCISE AND CLOSER TO SQL-LIKE SYNTAX.

In [72]:
# CONCLUSION: DATAFRAMES ARE USER-FRIENDLY SANDE OPTIMIZED APPROACH FRO STRUCTURED DATA IN APACHE SPARK.

In [103]:
# METHOD 1 TO USE BASH SHELL CLI!!!

In [106]:
%%bash 
head -10 ./products.csv

id,name,category,quantity,price
1,iPhone 12,Electronics,10,899.99
2,Nike Air Max 90,Clothing,25,119.99
3,KitchenAid Stand Mixer,Home Appliances,5,299.99
4,The Great Gatsby,Books,50,12.99
5,L'Oreal Paris Mascara,Beauty,100,9.99
6,Yoga Mat,Sports,30,29.99
7,Samsung 4K Smart TV,Electronics,8,799.99
8,Levi's Jeans,Clothing,15,49.99
9,Dyson Vacuum Cleaner,Home Appliances,3,399.99


In [107]:
# METHOD 2 TO USE BASH SHELL CLI!!!
! head -10 ./products.csv

id,name,category,quantity,price
1,iPhone 12,Electronics,10,899.99
2,Nike Air Max 90,Clothing,25,119.99
3,KitchenAid Stand Mixer,Home Appliances,5,299.99
4,The Great Gatsby,Books,50,12.99
5,L'Oreal Paris Mascara,Beauty,100,9.99
6,Yoga Mat,Sports,30,29.99
7,Samsung 4K Smart TV,Electronics,8,799.99
8,Levi's Jeans,Clothing,15,49.99
9,Dyson Vacuum Cleaner,Home Appliances,3,399.99


In [108]:
# READ OUR CSV FILE INTO A DATAFRAME

In [109]:
# METHOD 1 WHERE SPARK AUTOMATICALLY DEFINES OUR SCHEMA

df_csv = spark.read.csv('products.csv', header=True)
df_csv.show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
| 11|        MAC Lipstick|         Beauty|      75| 16.99|
| 12|Adidas Running Shoes|         Sports|      22| 59.99|
| 13|       PlayStation 5|    Electronics|      12|499.99|
| 14|   Hooded Sweatshirt|       Clothing|      10| 34.9

In [110]:
df_csv.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- price: string (nullable = true)



In [111]:
# READ CSV WITH AN EXPLICIT SCHEMA DEFINITION

In [114]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [121]:
# METHOD 2 WHERE WE DEFINE OUR SCHEMA

schema = StructType([StructField(name="id", dataType = IntegerType(), nullable = True),
                     StructField(name="name", dataType = StringType(), nullable = True),
                     StructField(name="category", dataType = StringType(), nullable = True),
                     StructField(name="quantity", dataType = IntegerType(), nullable = True),
                     StructField(name="price", dataType = DoubleType(), nullable = True)
                    ])

In [122]:
df_csv2 = spark.read.csv("products.csv", header=True, schema=schema)

In [124]:
df_csv2.show(5)

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
+---+--------------------+---------------+--------+------+
only showing top 5 rows



In [125]:
df_csv2.head(5)

[Row(id=1, name='iPhone 12', category='Electronics', quantity=10, price=899.99),
 Row(id=2, name='Nike Air Max 90', category='Clothing', quantity=25, price=119.99),
 Row(id=3, name='KitchenAid Stand Mixer', category='Home Appliances', quantity=5, price=299.99),
 Row(id=4, name='The Great Gatsby', category='Books', quantity=50, price=12.99),
 Row(id=5, name="L'Oreal Paris Mascara", category='Beauty', quantity=100, price=9.99)]

In [127]:
df_csv2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



In [128]:
df_csv3 = spark.read.csv("products.csv", header=True, inferSchema=True)
df_csv3.show()

+---+--------------------+---------------+--------+------+
| id|                name|       category|quantity| price|
+---+--------------------+---------------+--------+------+
|  1|           iPhone 12|    Electronics|      10|899.99|
|  2|     Nike Air Max 90|       Clothing|      25|119.99|
|  3|KitchenAid Stand ...|Home Appliances|       5|299.99|
|  4|    The Great Gatsby|          Books|      50| 12.99|
|  5|L'Oreal Paris Mas...|         Beauty|     100|  9.99|
|  6|            Yoga Mat|         Sports|      30| 29.99|
|  7| Samsung 4K Smart TV|    Electronics|       8|799.99|
|  8|        Levi's Jeans|       Clothing|      15| 49.99|
|  9|Dyson Vacuum Cleaner|Home Appliances|       3|399.99|
| 10| Harry Potter Series|          Books|      20| 15.99|
| 11|        MAC Lipstick|         Beauty|      75| 16.99|
| 12|Adidas Running Shoes|         Sports|      22| 59.99|
| 13|       PlayStation 5|    Electronics|      12|499.99|
| 14|   Hooded Sweatshirt|       Clothing|      10| 34.9

In [129]:
df_csv3.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



In [130]:
# READING JSON FILES INTO DATAFRAME

In [134]:
# READING SINGLE FILE JSON FILES

# EACH ROW IS A JSON RECORD, RECORDS ARE SEPARATED BY A NEW LINE

In [135]:
%%bash
head -10 products_singleline.json

{"id":1,"name":"iPhone 12","category":"Electronics","quantity":10,"price":899.99}
{"id":2,"name":"Nike Air Max 90","category":"Clothing","quantity":25,"price":119.99}
{"id":3,"name":"KitchenAid Stand Mixer","category":"Home Appliances","quantity":5,"price":299.99}
{"id":4,"name":"The Great Gatsby","category":"Books","quantity":50,"price":12.99}
{"id":5,"name":"L'Oreal Paris Mascara","category":"Beauty","quantity":100,"price":9.99}
{"id":6,"name":"Yoga Mat","category":"Sports","quantity":30,"price":29.99}
{"id":7,"name":"Samsung 4K Smart TV","category":"Electronics","quantity":8,"price":799.99}
{"id":8,"name":"Levi's Jeans","category":"Clothing","quantity":15,"price":49.99}
{"id":9,"name":"Dyson Vacuum Cleaner","category":"Home Appliances","quantity":3,"price":399.99}
{"id":10,"name":"Harry Potter Series","category":"Books","quantity":20,"price":15.99}


In [136]:
df_json = spark.read.json('products_singleline.json')
df_json.show()

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
|         Sports|  6|            Yoga Mat| 29.99|      30|
|    Electronics|  7| Samsung 4K Smart TV|799.99|       8|
|       Clothing|  8|        Levi's Jeans| 49.99|      15|
|Home Appliances|  9|Dyson Vacuum Cleaner|399.99|       3|
|          Books| 10| Harry Potter Series| 15.99|      20|
|         Beauty| 11|        MAC Lipstick| 16.99|      75|
|         Sports| 12|Adidas Running Shoes| 59.99|      22|
|    Electronics| 13|       PlayStation 5|499.99|      12|
|       Clothing| 14|   Hooded Sweatshirt| 34.99|      1

In [137]:
df_json.printSchema()

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)



In [138]:
# READ MULTILINES FOR JSON FILES

In [153]:
%%bash 
head -30 ./products_multiline.json

[
  {
    "id": 1,
    "name": "iPhone 12",
    "category": "Electronics",
    "quantity": 10,
    "price": 899.99
  },
  {
    "id": 2,
    "name": "Nike Air Max 90",
    "category": "Clothing",
    "quantity": 25,
    "price": 119.99
  },
  {
    "id": 3,
    "name": "KitchenAid Stand Mixer",
    "category": "Home Appliances",
    "quantity": 5,
    "price": 299.99
  },
  {
    "id": 4,
    "name": "The Great Gatsby",
    "category": "Books",
    "quantity": 50,
    "price": 12.99
  },
  {


In [155]:
df_json2 = spark.read.json("products_multiline.json", multiLine = True)
df_json2.show()

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
|         Sports|  6|            Yoga Mat| 29.99|      30|
|    Electronics|  7| Samsung 4K Smart TV|799.99|       8|
|       Clothing|  8|        Levi's Jeans| 49.99|      15|
|Home Appliances|  9|Dyson Vacuum Cleaner|399.99|       3|
|          Books| 10| Harry Potter Series| 15.99|      20|
|         Beauty| 11|        MAC Lipstick| 16.99|      75|
|         Sports| 12|Adidas Running Shoes| 59.99|      22|
|    Electronics| 13|       PlayStation 5|499.99|      12|
|       Clothing| 14|   Hooded Sweatshirt| 34.99|      1

In [156]:
df_json2.printSchema()

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)



In [None]:
# READ AND WRITE PARQUET FILES INTO DATAFRAME