In [14]:
from pyspark.sql import SparkSession

In [15]:
#Create a SparkSession
spark = SparkSession.builder.appName("Dataframe").getOrCreate()

## Using RDDs

In [36]:
rdd = spark.sparkContext.textFile("data/input_data.txt")

In [37]:
result_rdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b: a + b).sortBy(lambda x: x[1],ascending=False)

In [38]:
result_rdd.collect()

[('and', 7),
 ('the', 7),
 ('for', 6),
 ('of', 5),
 ('a', 5),
 ('data', 5),
 ('faster', 4),
 ('', 4),
 ('dataset', 4),
 ('Spark', 4),
 ('in', 4),
 ('on', 4),
 ('also', 4),
 ('reviews', 4),
 ('to', 3),
 ('as', 3),
 ('around', 3),
 ('or', 3),
 ('Dataset:', 3),
 ('contains', 3),
 ('The', 3),
 ('includes', 3),
 ('more', 2),
 ('you', 2),
 ('it', 2),
 ('have', 2),
 ('at', 2),
 ('Review', 2),
 ('hotel', 2),
 ('car', 2),
 ('25,000', 2),
 ('Twitter', 2),
 ('over', 2),
 ('one', 2),
 ('This', 2),
 ('which', 2),
 ('tweets', 2),
 ('general', 1),
 ('lets', 1),
 ('up', 1),
 ('100x', 1),
 ('memory,', 1),
 ('disk,', 1),
 ('than', 1),
 ('Last', 1),
 ('took', 1),
 ('Hadoop', 1),
 ('by', 1),
 ('completing', 1),
 ('TB', 1),
 ('Daytona', 1),
 ('contest', 1),
 ('3x', 1),
 ('tenth', 1),
 ('number', 1),
 ('machines', 1),
 ('write', 1),
 ('80', 1),
 ('To', 1),
 ('demonstrate', 1),
 ('this,', 1),
 ('let’s', 1),
 ('“Hello', 1),
 ('World!”', 1),
 ('Word', 1),
 ('Count', 1),
 ('example.', 1),
 ('reviews:', 1),
 ('T

## Using DataFrames

In [67]:
from pyspark.sql.functions import desc

In [68]:
df = spark.read.text("data/input_data.txt")

result_df = df.selectExpr("explode(split(value, ' ')) as word").groupBy("word").count().orderBy(desc("count"))

In [69]:
result_df.take(10)

[Row(word='the', count=7),
 Row(word='and', count=7),
 Row(word='for', count=6),
 Row(word='data', count=5),
 Row(word='of', count=5),
 Row(word='a', count=5),
 Row(word='on', count=4),
 Row(word='Spark', count=4),
 Row(word='dataset', count=4),
 Row(word='in', count=4)]

In [70]:
result_df.show()

+--------+-----+
|    word|count|
+--------+-----+
|     the|    7|
|     and|    7|
|     for|    6|
|    data|    5|
|      of|    5|
|       a|    5|
|      in|    4|
|      on|    4|
| dataset|    4|
|   Spark|    4|
|  faster|    4|
|        |    4|
| reviews|    4|
|    also|    4|
|contains|    3|
|includes|    3|
|Dataset:|    3|
|  around|    3|
|     The|    3|
|      as|    3|
+--------+-----+
only showing top 20 rows



In [9]:
%%bash
head -10 ./data/customers.csv

Index,Customer Id,First Name,Last Name,Company,City,Country,Phone 1,Phone 2,Email,Subscription Date,Website
1,DD37Cf93aecA6Dc,Sheryl,Baxter,Rasmussen Group,East Leonard,Chile,229.077.5154,397.884.0519x718,zunigavanessa@smith.info,2020-08-24,http://www.stephenson.com/
2,1Ef7b82A4CAAD10,Preston,Lozano,Vega-Gentry,East Jimmychester,Djibouti,5153435776,686-620-1820x944,vmata@colon.com,2021-04-23,http://www.hobbs.com/
3,6F94879bDAfE5a6,Roy,Berry,Murillo-Perry,Isabelborough,Antigua and Barbuda,+1-539-402-0259,(496)978-3969x58947,beckycarr@hogan.com,2020-03-25,http://www.lawrence.com/
4,5Cef8BFA16c5e3c,Linda,Olsen,"Dominguez, Mcmillan and Donovan",Bensonview,Dominican Republic,001-808-617-6467x12895,+1-813-324-8756,stanleyblackwell@benson.org,2020-06-02,http://www.good-lyons.com/
5,053d585Ab6b3159,Joanna,Bender,"Martin, Lang and Andrade",West Priscilla,Slovakia (Slovak Republic),001-234-203-0635x76146,001-199-446-3860x3486,colinalvarado@miles.net,2021-04-17,https://goodwin-ingram.com/
6,2d08F

## Read CSV with header

In [22]:
# Read CSV file into DataFrame
cust_df = spark.read.csv("data/customers.csv",header=True)

In [23]:
# Display schema of DataFrame
cust_df.printSchema()

root
 |-- Index: string (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: string (nullable = true)
 |-- Website: string (nullable = true)



## Read CSV with an explicit schema definition

In [24]:
# import necessary types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [36]:
# Define the schema
schema = StructType([
    StructField(name="Index", dataType=IntegerType(), nullable=True),
    StructField(name="Customer Id", dataType=StringType(), nullable=True),
    StructField(name="First Name", dataType=StringType(), nullable=True),
    StructField(name="Last Name", dataType=StringType(), nullable=True),
    StructField(name="Company", dataType=StringType(), nullable=True)
])

In [37]:
# Read CSV file into DataFrame with schema definition
csv_file_path = "./data/customers.csv"
df = spark.read.csv(csv_file_path, header=True,schema=schema)

In [38]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)

+-----+---------------+----------+---------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|
+-----+---------------+----------+---------+--------------------+
|    1|DD37Cf93aecA6Dc|    Sheryl|   Baxter|     Rasmussen Group|
|    2|1Ef7b82A4CAAD10|   Preston|   Lozano|         Vega-Gentry|
|    3|6F94879bDAfE5a6|       Roy|    Berry|       Murillo-Perry|
|    4|5Cef8BFA16c5e3c|     Linda|    Olsen|Dominguez, Mcmill...|
|    5|053d585Ab6b3159|    Joanna|   Bender|Martin, Lang and ...|
+-----+---------------+----------+---------+--------------------+
only showing top 5 rows



## Read CSV with inferSchema

In [39]:
# Read CSV file into DataFrame with inferSchema
csv_file_path = "./data/customers.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

In [40]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- Index: integer (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Phone 1: string (nullable = true)
 |-- Phone 2: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Subscription Date: date (nullable = true)
 |-- Website: string (nullable = true)

+-----+---------------+----------+---------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------+
|Index|    Customer Id|First Name|Last Name|             Company|             City|             Country|             Phone 1|             Phone 2|               Email|Subscription Date|             Website|
+-----+---------------+----------+---------+--------------------+-----------------+------------------

## Read JSON file into DataFrame

### Single Line JSON

In [43]:
# Read single line JSON
# Each row is a JSON record, records are separated by new line
json_file_path = "./data/products_singleline.json"
df = spark.read.json(json_file_path)

In [44]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



### Multi-lines JSON

In [54]:
# Read multi-line JSON
# JSON is an array of record, records are separated by a comma.
# each record is defined in multiple lines
json_file_path = "./data/products_multiline.json"
df = spark.read.json(json_file_path, multiLine=True)

In [55]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



In [65]:
# write dataframe into parquet file
parquet_file_path = "./data/products.parquet"
df.write.mode('overwrite').partitionBy('category').parquet(parquet_file_path)

### Read parquet file into DataFrame

In [48]:
df = spark.read.parquet(parquet_file_path)

In [49]:
# Display schema of DataFrame
df.printSchema()

# Display content of DataFrame
df.show(5)

root
 |-- category: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: long (nullable = true)

+---------------+---+--------------------+------+--------+
|       category| id|                name| price|quantity|
+---------------+---+--------------------+------+--------+
|    Electronics|  1|           iPhone 12|899.99|      10|
|       Clothing|  2|     Nike Air Max 90|119.99|      25|
|Home Appliances|  3|KitchenAid Stand ...|299.99|       5|
|          Books|  4|    The Great Gatsby| 12.99|      50|
|         Beauty|  5|L'Oreal Paris Mas...|  9.99|     100|
+---------------+---+--------------------+------+--------+
only showing top 5 rows



In [50]:
spark

In [92]:
from pyspark.sql import SparkSession

spark1 = SparkSession.builder \
    .enableHiveSupport() \
    .config("spark.sql.warehouse.dir", "C:/spark-warehouse") \
    .getOrCreate()


In [93]:
csv_file_path = "./data/customers.csv"
df = spark1.read.csv(csv_file_path, header=True, inferSchema=True)

In [94]:
df.write \
    .bucketBy(5, "Index") \
    .sortBy("Index") \
    .mode("overwrite") \
    .saveAsTable("mytable")


In [112]:
df_read = spark1.sql("SELECT `First Name`, Company FROM mytable")

In [113]:
df_read.show(5)

+----------+--------------------+
|First Name|             Company|
+----------+--------------------+
|   Preston|         Vega-Gentry|
|     Aimee|        Steele Group|
|    Sheryl|      Browning-Simon|
|     Jenna|Hoffman, Reed and...|
|   Miranda|  Singleton and Sons|
+----------+--------------------+
only showing top 5 rows



In [115]:
spark1.stop()