In [2]:
import findspark

findspark.init('/home/adilsaid/spark-3.5.1-bin-hadoop3')

from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('Basics').getOrCreate() # Creating a session

24/03/03 18:48:58 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 192.168.178.97 instead (on interface wlp1s0)
24/03/03 18:48:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/03 18:48:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.json('people.json')
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [6]:
df.columns

['age', 'name']

In [7]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   NULL|
| stddev|7.7781745930520225|   NULL|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



In [8]:
# clarifying the scheme

from pyspark.sql.types import StructField, StringType, IntegerType, StructType


In [9]:
# creating a list of structure feilds

# the bool is to say, its okay for the feild to be null.
data_scheme = [StructField('age', IntegerType(), True), 
               StructField('name', StringType(), True)]

In [10]:
final_struc = StructType(fields=data_scheme)

# reading the json file, and adding the scheme we specified

df = spark.read.json('people.json', schema = final_struc)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [14]:
# select vs grabbing the data

type(df['age']) # a column object

pyspark.sql.column.Column

In [15]:
# to get a df with the data in that singular column we use select

df.select('age').show() # a dataframe, that returns a single column.

+----+
| age|
+----+
|NULL|
|  30|
|  19|
+----+



In [16]:
type(df.select('age').show())

+----+
| age|
+----+
|NULL|
|  30|
|  19|
+----+



NoneType

In [22]:
df.head() # a list of row objects

df.head(2) # First two rows. 

df.head(2)[0] # A row object

Row(age=None, name='Michael')

In [25]:
# selecting multiple columns

df.select(['age', 'name'])

DataFrame[age: int, name: string]

In [28]:
# creating a new column
df.withColumn('doubleage', df['age']*2).show() # this is not an inplace operation

+----+-------+---------+
| age|   name|doubleage|
+----+-------+---------+
|NULL|Michael|     NULL|
|  30|   Andy|       60|
|  19| Justin|       38|
+----+-------+---------+



In [29]:
df.withColumnRenamed('age', 'my_new_age').show() # re-naming a column

+----------+-------+
|my_new_age|   name|
+----------+-------+
|      NULL|Michael|
|        30|   Andy|
|        19| Justin|
+----------+-------+



In [30]:
# using pure sql to interact with the df

df.createOrReplaceTempView('people') # registering as a sql temp view

In [32]:
results = spark.sql("SELECT * FROM people")
results.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [34]:
new_results = spark.sql("SELECT * FROM people WHERE age=30")
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

