## Spark DataFrame Basics

Spark DataFrames allow for easy handling of large datasets.

* Easy syntax
* Ability to use SQL directly in the dataframe
* Operations are automatically distributed across RDDs

### Create a DataFrame

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.appName("pyspark_basics").getOrCreate()

25/04/30 08:57:01 WARN Utils: Your hostname, aditya-HP-Laptop-15s-eq1xxx resolves to a loopback address: 127.0.1.1; using 10.103.4.19 instead (on interface wlo1)
25/04/30 08:57:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 08:57:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/30 08:57:19 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [5]:
%%writefile user_simple.json
{"name":"Bob"}
{"name":"Jim", "age":40}
{"name":"Mary", "age": 24}

Writing user_simple.json


In [7]:
df = spark.read.json("user_simple.json")
df.show()

+----+----+
| age|name|
+----+----+
|NULL| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [8]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [9]:
df.columns

['age', 'name']

In [10]:
df.describe()

DataFrame[summary: string, age: string, name: string]

### Specifying Schema Structure
 * Some data types make it easier to infer schema
 * Often have to set the schema yourself
 * Spark has tools to help specify the structure

Next we need to create the list of Structure fields

 * :param name: string, name of the field.
 * :param dataType: class **DataType** of the field
 * :param nullable: boolean, whether the field can be null (None)

In [11]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

In [12]:
data_schema = [StructField("age", IntegerType(), True), StructField("name", StringType(), True)]

In [13]:
final_struc = StructType(fields=data_schema)

In [14]:
df = spark.read.json("user_simple.json", schema=final_struc)

In [15]:
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



### Grab Data

In [16]:
df['age']

Column<'age'>

In [18]:
type(df['age'])

pyspark.sql.column.Column

In [19]:
df.select("age")

DataFrame[age: int]

In [20]:
type(df.select("age"))

pyspark.sql.dataframe.DataFrame

In [21]:
df.select("age").show()

+----+
| age|
+----+
|NULL|
|  40|
|  24|
+----+



In [22]:
df.head(2)

[Row(age=None, name='Bob'), Row(age=40, name='Jim')]

In [24]:
# df.head(2).show() # gives error

In [26]:
df.select(["name", "age"]).show(2)

+----+----+
|name| age|
+----+----+
| Bob|NULL|
| Jim|  40|
+----+----+
only showing top 2 rows



### Create New Columns

In [27]:
df.withColumn('newAge', df['age']).show()

+----+----+------+
| age|name|newAge|
+----+----+------+
|NULL| Bob|  NULL|
|  40| Jim|    40|
|  24|Mary|    24|
+----+----+------+



In [29]:
df.withColumnRenamed("name", "firstName").show()

+----+---------+
| age|firstName|
+----+---------+
|NULL|      Bob|
|  40|      Jim|
|  24|     Mary|
+----+---------+



In [30]:
df.show()

+----+----+
| age|name|
+----+----+
|NULL| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [31]:
df.withColumn("agePlusTen", df["age"]+10).show()

+----+----+----------+
| age|name|agePlusTen|
+----+----+----------+
|NULL| Bob|      NULL|
|  40| Jim|        50|
|  24|Mary|        34|
+----+----+----------+



In [34]:
df.withColumn("age_minus_five", df["age"]-5).show()

+----+----+--------------+
| age|name|age_minus_five|
+----+----+--------------+
|NULL| Bob|          NULL|
|  40| Jim|            35|
|  24|Mary|            19|
+----+----+--------------+



### Using SQL

In [35]:
df.createOrReplaceTempView("customers")

In [36]:
sql_results = spark.sql("SELECT * FROM customers")
sql_results

DataFrame[age: int, name: string]

In [37]:
sql_results.show()

+----+----+
| age|name|
+----+----+
|NULL| Bob|
|  40| Jim|
|  24|Mary|
+----+----+



In [40]:
spark.sql("SELECT * FROM customers WHERE age<25").show()

+---+----+
|age|name|
+---+----+
| 24|Mary|
+---+----+

