# Imports

In [30]:
from typing import List
from pyspark.sql import SparkSession

Anytime we want to work with spark we need to create a new seession, here's the code which enables it:

In [17]:
session = SparkSession.builder.appName('Practice').getOrCreate()

Session is python object

In [18]:
session

It's allocated in memory and has it own id

In [19]:
id(session)

4363117256

Now I'm going to read example data which consists of flat rent offers

df = session.read.option('header', 'true').csv(path_to_file)

In [25]:
path_to_file:str = 'data/data.csv'
df = (
    session.read
    # setting the delimeter as ; in one of the options
    .option(
    'delimiter',
    ';'
    )
    # reading the header as well
    .option(
    'header', 
    'true')
    .csv(
    path_to_file,
    # in order to keep existing datatypes
    inferSchema=True
        )
    )

Type of new created dataframe is **pyspark.sql.dataframe.DataFrame**

In [26]:
type(df)

pyspark.sql.dataframe.DataFrame

How to get first n rows?

In [27]:
n = 5

df.head(5)

[Row(AREA_IN_M2=60.0, PRICE=5000.0, DATE=datetime.datetime(2022, 10, 21, 0, 0), DISTRICT='stare miasto'),
 Row(AREA_IN_M2=45.0, PRICE=2000.0, DATE=datetime.datetime(2022, 10, 21, 0, 0), DISTRICT='podolany'),
 Row(AREA_IN_M2=33.6, PRICE=1700.0, DATE=datetime.datetime(2022, 10, 21, 0, 0), DISTRICT='grunwald'),
 Row(AREA_IN_M2=60.0, PRICE=6000.0, DATE=datetime.datetime(2022, 10, 21, 0, 0), DISTRICT='stare miasto'),
 Row(AREA_IN_M2=43.0, PRICE=2850.0, DATE=datetime.datetime(2022, 10, 21, 0, 0), DISTRICT='łazarz')]

How to print schema?

In [28]:
df.printSchema()

root
 |-- AREA_IN_M2: double (nullable = true)
 |-- PRICE: double (nullable = true)
 |-- DATE: timestamp (nullable = true)
 |-- DISTRICT: string (nullable = true)



How to get columns?

In [29]:
df.columns

['AREA_IN_M2', 'PRICE', 'DATE', 'DISTRICT']

## Simple operations

* Selecting

In [31]:
columns_to_select:List[str] = ['DATE', 'PRICE']

df.select(columns_to_select).show()

+-------------------+------+
|               DATE| PRICE|
+-------------------+------+
|2022-10-21 00:00:00|5000.0|
|2022-10-21 00:00:00|2000.0|
|2022-10-21 00:00:00|1700.0|
|2022-10-21 00:00:00|6000.0|
|2022-10-21 00:00:00|2850.0|
|2022-10-21 00:00:00|2999.0|
|2022-10-21 00:00:00|3200.0|
|2022-10-21 00:00:00|1900.0|
|2022-10-21 00:00:00|2000.0|
|2022-10-21 00:00:00| 100.0|
|2022-10-21 00:00:00|2200.0|
|2022-10-21 00:00:00|1800.0|
|2022-10-21 00:00:00|2300.0|
|2022-10-21 00:00:00|2300.0|
|2022-10-21 00:00:00|3100.0|
|2022-10-21 00:00:00|2300.0|
|2022-10-21 00:00:00|2500.0|
|2022-10-21 00:00:00|3100.0|
|2022-10-21 00:00:00|2200.0|
|2022-10-21 00:00:00|3600.0|
+-------------------+------+
only showing top 20 rows



* Filtering

In [33]:
df.filter(
    (df['PRICE']<=2000) & 
    (df['DISTRICT'] == 'grunwald')
    ).show()

+----------+------+-------------------+--------+
|AREA_IN_M2| PRICE|               DATE|DISTRICT|
+----------+------+-------------------+--------+
|      33.6|1700.0|2022-10-21 00:00:00|grunwald|
|      78.3|1300.0|2022-10-21 00:00:00|grunwald|
|      39.0|2000.0|2022-10-21 00:00:00|grunwald|
|      54.0|2000.0|2022-10-21 00:00:00|grunwald|
|     59.35|2000.0|2022-10-21 00:00:00|grunwald|
|      38.0|1700.0|2022-10-21 00:00:00|grunwald|
|      48.0|2000.0|2022-10-21 00:00:00|grunwald|
|      73.0|2000.0|2022-10-21 00:00:00|grunwald|
|      35.0|1600.0|2022-10-21 00:00:00|grunwald|
|      28.0|1700.0|2022-10-20 00:00:00|grunwald|
|      27.5|1500.0|2022-10-19 00:00:00|grunwald|
|      12.0|1500.0|2022-10-19 00:00:00|grunwald|
|      31.0|1000.0|2022-10-19 00:00:00|grunwald|
|      40.0|1600.0|2022-10-18 00:00:00|grunwald|
|      52.0|1900.0|2022-10-18 00:00:00|grunwald|
|      34.0|1600.0|2022-10-17 00:00:00|grunwald|
|      35.0|1700.0|2022-10-17 00:00:00|grunwald|
|      31.0|1800.0|2

22/11/02 20:47:18 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 839777 ms exceeds timeout 120000 ms
22/11/02 20:47:19 WARN SparkContext: Killing executors is not supported by current scheduler.
