<a href="https://colab.research.google.com/github/Lubaszka/Lubaszka/blob/main/Kopia_notatnika_Spark_Lab_Dataframes1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Java, Spark, and Findspark
This installs Apache Spark, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xvf spark-3.1.2-bin-hadoop3.2.tgz > /dev/null
!pip install -q findspark > /dev/null

# Start a SparkSession
This will start a local Spark session.

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
display(spark)

In [3]:
!ps xxx
#spark.sparkContext.getConf().getAll()

    PID TTY      STAT   TIME COMMAND
      1 ?        Ss     0:00 /sbin/docker-init -- /datalab/run.sh
      7 ?        Sl     0:00 /tools/node/bin/node /datalab/web/app.js
     21 ?        S      0:00 /bin/bash -e /usr/local/colab/bin/oom_monitor.sh
     23 ?        S      0:00 /bin/bash -e /datalab/run.sh
     25 ?        Sl     0:00 /usr/colab/bin/kernel_manager_proxy --listen_port=6000 --target_port=90
     28 ?        Ss     0:00 tail -n +0 -F /root/.config/Google/DriveFS/Logs/drive_fs.txt
     45 ?        Ss     0:00 tail -n +0 -F /root/.config/Google/DriveFS/Logs/dpb.txt
     69 ?        Z      0:19 [python3] <defunct>
     70 ?        S      0:00 python3 /usr/local/bin/colab-fileshim.py
     91 ?        Sl     0:05 /usr/bin/python3 /usr/local/bin/jupyter-notebook --debug --transport="i
     92 ?        Sl     0:00 /usr/local/bin/dap_multiplexer --domain_socket_path=/tmp/debugger_rj4rf
    228 ?        Ssl    0:03 /usr/bin/python3 -m colab_kernel_launcher -f /root/.local/share/j

# Dataframes

DataFrames in Apache Spark offer a structured way to represent and manipulate distributed data. They provide a familiar table-like structure with named columns and rows, making them easier to work with compared to raw RDDs (Resilient Distributed Datasets).


**DataFrames:**

* **Structured Data:** Organized with named columns and defined data types for each column.
* **SQL-like Operations:** Allow filtering, aggregation, and joining data using familiar SQL syntax.
* **Readability:** Easier to understand due to clear structure and column names.
* **Type Safety:** Enforce data types, reducing the risk of errors during operations.
* **Integration:** Seamless integration with Spark SQL and machine learning libraries for streamlined workflows.

**Comparison with RDDs:**

| Feature        | DataFrames                                     | RDDs                                         |
|----------------|-------------------------------------------------|------------------------------------------------|
| Structure      | Structured with named columns and data types   | Flat collection of rows, each potentially having different data types |
| Operations     | SQL-like operations for filtering, aggregation | Transformations like `map`, `filter`, `groupBy` |
| Readability     | Easier to read and understand                     | Requires knowledge of row structure and data types |
| Type Safety    | Enforces data types for columns                 | No inherent type safety                        |
| Integration    | Better integration with other Spark libraries     | More flexibility for low-level data processing |

**Choosing Between DataFrames and RDDs:**

DataFrames are generally preferred for most data analysis tasks due to their readability, type safety, and compatibility with other Spark functionalities. However, RDDs can be useful for:

* **Low-Level Control:** When fine-grained control over data processing is needed.
* **Custom Data Types:** Handling data types not natively supported by DataFrames.



# Use Spark!
That's all there is to it - you're ready to use Spark!

In [4]:
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')

In [6]:
from pyspark.sql.functions import col, udf, trim, isnull
from pyspark.sql.types import FloatType, IntegerType

In [21]:
def to_float (s) :
   return float(s.replace(',','.'))
float_udf = udf(to_float , FloatType())

In [8]:
# Use wget to get data files
# Note: works for small files, in reality we would use a distributed or object storage such as HDFS or S3
!wget https://raw.githubusercontent.com/dice-dydakt/datasets/main/airports_all.csv -O airports.csv
!wget https://raw.githubusercontent.com/dice-dydakt/datasets/main/countries%20of%20the%20world.csv


# Alternatively you can mount your google drive (more persistent)
#from google.colab import drive
#drive.mount('/content/drive')
#example path: 'drive/MyDrive/data_science'

--2025-03-08 12:08:28--  https://raw.githubusercontent.com/dice-dydakt/datasets/main/airports_all.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 754309 (737K) [text/plain]
Saving to: ‘airports.csv’


2025-03-08 12:08:28 (13.1 MB/s) - ‘airports.csv’ saved [754309/754309]

--2025-03-08 12:08:28--  https://raw.githubusercontent.com/dice-dydakt/datasets/main/countries%20of%20the%20world.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 38303 (37K) [text/plain]
Saving to: ‘countries of the world.csv’


2025-03-08 12:08:28 (3.71 

In [9]:
# Paths to files
countries_path = '/content/countries of the world.csv'
airports_path = '/content/airports.csv'


#Example paths if on google drive
#countries_path = '/content/drive/MyDrive/data_science/countries of the world.csv'
#airports_path = '/content/drive/MyDrive/data_science/airports.csv'

In [10]:
%time
# Tak można ustawić wyświetlanie większej liczby wierszy w Jupyterze
# pd.set_option('display.max_rows', 120)

countries = spark.read.csv(countries_path,inferSchema=True,header=True)
display(countries.limit(10).toPandas())  # correct - why?
# display(countries.toPandas().head(10)) # wrong - why?
countries.printSchema()

CPU times: user 2 µs, sys: 1 µs, total: 3 µs
Wall time: 6.68 µs


Unnamed: 0,Country,Region,Population,Area (sq. mi.),Pop. Density (per sq. mi.),Coastline (coast/area ratio),Net migration,Infant mortality (per 1000 births),GDP ($ per capita),Literacy (%),Phones (per 1000),Arable (%),Crops (%),Other (%),Climate,Birthrate,Deathrate,Agriculture,Industry,Service
0,Afghanistan,ASIA (EX. NEAR EAST),31056997,647500,480,0,2306,16307,700,360,32,1213,22,8765,1.0,466,2034,38.0,24.0,38.0
1,Albania,EASTERN EUROPE,3581655,28748,1246,126,-493,2152,4500,865,712,2109,442,7449,3.0,1511,522,232.0,188.0,579.0
2,Algeria,NORTHERN AFRICA,32930091,2381740,138,4,-39,31,6000,700,781,322,25,9653,1.0,1714,461,101.0,6.0,298.0
3,American Samoa,OCEANIA,57794,199,2904,5829,-2071,927,8000,970,2595,10,15,75,2.0,2246,327,,,
4,Andorra,WESTERN EUROPE,71201,468,1521,0,66,405,19000,1000,4972,222,0,9778,3.0,871,625,,,
5,Angola,SUB-SAHARAN AFRICA,12127071,1246700,97,13,0,19119,1900,420,78,241,24,9735,,4511,242,96.0,658.0,246.0
6,Anguilla,LATIN AMER. & CARIB,13477,102,1321,5980,1076,2103,8600,950,4600,0,0,100,2.0,1417,534,4.0,18.0,78.0
7,Antigua & Barbuda,LATIN AMER. & CARIB,69108,443,1560,3454,-615,1946,11000,890,5499,1818,455,7727,2.0,1693,537,38.0,22.0,743.0
8,Argentina,LATIN AMER. & CARIB,39921833,2766890,144,18,61,1518,11200,971,2204,1231,48,8721,3.0,1673,755,95.0,358.0,547.0
9,Armenia,C.W. OF IND. STATES,2976372,29800,999,0,-647,2328,3500,986,1957,1755,23,8015,4.0,1207,823,239.0,343.0,418.0


root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Area (sq. mi.): integer (nullable = true)
 |-- Pop. Density (per sq. mi.): string (nullable = true)
 |-- Coastline (coast/area ratio): string (nullable = true)
 |-- Net migration: string (nullable = true)
 |-- Infant mortality (per 1000 births): string (nullable = true)
 |-- GDP ($ per capita): integer (nullable = true)
 |-- Literacy (%): string (nullable = true)
 |-- Phones (per 1000): string (nullable = true)
 |-- Arable (%): string (nullable = true)
 |-- Crops (%): string (nullable = true)
 |-- Other (%): string (nullable = true)
 |-- Climate: string (nullable = true)
 |-- Birthrate: string (nullable = true)
 |-- Deathrate: string (nullable = true)
 |-- Agriculture: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Service: string (nullable = true)



In [12]:
# Rename columns (e.g. remove dots which Spark doesn't like)
from pyspark.sql import DataFrame
def rename_columns_lowercase_underscore(df: DataFrame) -> DataFrame:
    for col_name in df.columns:
        new_col_name = col_name.lower().replace(" ", "_").replace(".", "")
        df = df.withColumnRenamed(col_name, new_col_name)
    return df
countries_df = rename_columns_lowercase_underscore(countries)
countries_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- area_(sq_mi): integer (nullable = true)
 |-- pop_density_(per_sq_mi): string (nullable = true)
 |-- coastline_(coast/area_ratio): string (nullable = true)
 |-- net_migration: string (nullable = true)
 |-- infant_mortality_(per_1000_births): string (nullable = true)
 |-- gdp_($_per_capita): integer (nullable = true)
 |-- literacy_(%): string (nullable = true)
 |-- phones_(per_1000): string (nullable = true)
 |-- arable_(%): string (nullable = true)
 |-- crops_(%): string (nullable = true)
 |-- other_(%): string (nullable = true)
 |-- climate: string (nullable = true)
 |-- birthrate: string (nullable = true)
 |-- deathrate: string (nullable = true)
 |-- agriculture: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- service: string (nullable = true)



In [63]:
# Trim string columns (there are invisible white spaces!)
def trim_columns(df, columns_to_trim):
       for column_name in columns_to_trim:
           df = df.withColumn(column_name, trim(col(column_name)))
       return df

from pyspark.sql.functions import trim, col

columns_to_trim = ["country", "region"]
countries_df = trim_columns(countries_df, columns_to_trim)

countries_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- area_(sq_mi): integer (nullable = true)
 |-- pop_density_(per_sq_mi): float (nullable = true)
 |-- coastline_(coast/area_ratio): string (nullable = true)
 |-- net_migration: string (nullable = true)
 |-- infant_mortality_(per_1000_births): float (nullable = true)
 |-- gdp_($_per_capita): integer (nullable = true)
 |-- literacy_(%): float (nullable = true)
 |-- phones_(per_1000): string (nullable = true)
 |-- arable_(%): string (nullable = true)
 |-- crops_(%): string (nullable = true)
 |-- other_(%): string (nullable = true)
 |-- climate: string (nullable = true)
 |-- birthrate: float (nullable = true)
 |-- deathrate: float (nullable = true)
 |-- agriculture: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- service: string (nullable = true)



In [50]:
# Assignment: write a similar function to convert selected string columns to float:
# "pop_density_(per_sq_mi)", "birthrate", "deathrate", "literacy_(%)", "infant_mortality_(per_1000_births)"
#
# TODO


def convert_columns_to_float(df, columns_to_convert):
    for column_name in columns_to_convert:
        df = df.withColumn(column_name, float_udf(col(column_name)))
    return df

columns_to_convert = ["pop_density_(per_sq_mi)", "birthrate", "deathrate", "literacy_(%)", "infant_mortality_(per_1000_births)"]
countries_df = convert_columns_to_float(countries_df, columns_to_convert)

countries_df.printSchema()



root
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- population: integer (nullable = true)
 |-- area_(sq_mi): integer (nullable = true)
 |-- pop_density_(per_sq_mi): float (nullable = true)
 |-- coastline_(coast/area_ratio): string (nullable = true)
 |-- net_migration: string (nullable = true)
 |-- infant_mortality_(per_1000_births): float (nullable = true)
 |-- gdp_($_per_capita): integer (nullable = true)
 |-- literacy_(%): float (nullable = true)
 |-- phones_(per_1000): string (nullable = true)
 |-- arable_(%): string (nullable = true)
 |-- crops_(%): string (nullable = true)
 |-- other_(%): string (nullable = true)
 |-- climate: string (nullable = true)
 |-- birthrate: float (nullable = true)
 |-- deathrate: float (nullable = true)
 |-- agriculture: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- service: string (nullable = true)



In [51]:
airports = spark.read.csv(airports_path,inferSchema=True,header=True)
display(airports.limit(10).toPandas())
airports.printSchema()

Unnamed: 0,Name,City,Country,IATA,ICAO,Latitude,Longitude,Altitude,Timezone8,DST,Timezone10
0,Goroka Airport,Goroka,Papua New Guinea,GKA,AYGA,-6.08169,145.391998,5282,10,U,Pacific/Port_Moresby
1,Madang Airport,Madang,Papua New Guinea,MAG,AYMD,-5.20708,145.789002,20,10,U,Pacific/Port_Moresby
2,Mount Hagen Kagamuga Airport,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.82679,144.296005,5388,10,U,Pacific/Port_Moresby
3,Nadzab Airport,Nadzab,Papua New Guinea,LAE,AYNZ,-6.569803,146.725977,239,10,U,Pacific/Port_Moresby
4,Port Moresby Jacksons International Airport,Port Moresby,Papua New Guinea,POM,AYPY,-9.44338,147.220001,146,10,U,Pacific/Port_Moresby
5,Wewak International Airport,Wewak,Papua New Guinea,WWK,AYWK,-3.58383,143.669006,19,10,U,Pacific/Port_Moresby
6,Narsarsuaq Airport,Narssarssuaq,Greenland,UAK,BGBW,61.1605,-45.425999,112,-3,E,America/Godthab
7,Godthaab / Nuuk Airport,Godthaab,Greenland,GOH,BGGH,64.190903,-51.678101,283,-3,E,America/Godthab
8,Kangerlussuaq Airport,Sondrestrom,Greenland,SFJ,BGSF,67.012222,-50.711603,165,-3,E,America/Godthab
9,Thule Air Base,Thule,Greenland,THU,BGTL,76.531197,-68.703201,251,-4,E,America/Thule


root
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Altitude: integer (nullable = true)
 |-- Timezone8: string (nullable = true)
 |-- DST: string (nullable = true)
 |-- Timezone10: string (nullable = true)



In [52]:
# Note: Dataframes still uses RDD of type Row underneath
airports.rdd.take(1)

[Row(Name='Goroka Airport', City='Goroka', Country='Papua New Guinea', IATA='GKA', ICAO='AYGA', Latitude=-6.081689835, Longitude=145.3919983, Altitude=5282, Timezone8='10', DST='U', Timezone10='Pacific/Port_Moresby')]

# Assignments

* Print all the airports that are to the North from Krakow.
* For each country, find the airport which is closest to the South Pole.
* Plot the coordinates of all the airports. Bonus: plot on the map.
* Plot GDP vs. Phones for all countries in Asia.
* Plot Infant mortality vs. Literacy for top 30 richest and top 30 poorest countries





In [65]:
northern_airports = airports.filter(airports.Latitude > krakow_latitude)

For each country, find the airport which is closest to the South Pole.

In [67]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, first

# Create a SparkSession
spark = SparkSession.builder.appName("ClosestAirportToSouthPole").getOrCreate()

# Read the airports data
airports = spark.read.csv("airports.csv", header=True, inferSchema=True)

# Group by country and airport name, find minimum latitude, and get the airport name
closest_airports = airports.groupBy("Country", "Name").agg(min("Latitude").alias("min_latitude")) \
    .orderBy("min_latitude", ascending=True) \
    .groupBy("Country") \
    .agg(first("Name").alias("Airport_Name"), first("min_latitude").alias("min_latitude"))

# Display the result
display(closest_airports.toPandas())

# Stop the SparkSession
spark.stop()

Unnamed: 0,Country,Airport_Name,min_latitude
0,Afghanistan,Zaranj Airport,30.972222
1,Albania,Vlorë Air Base,40.476101
2,Algeria,Bordj Badji Mokhtar Airport,21.375000
3,American Samoa,Pago Pago International Airport,-14.331000
4,Angola,Ngjiva Pereira Airport,-17.043501
...,...,...,...
232,West Bank,Jerusalem Airport,31.864700
233,Western Sahara,Dakhla Airport,23.718300
234,Yemen,Socotra International Airport,12.630700
235,Zambia,Livingstone Airport,-17.821800


lot the coordinates of all the airports. Bonus: plot on the map.

In [72]:
!pip install plotly==5.15.0

import plotly.express as px
import pandas as pd
import plotly.io as pio

airports_pd = airports.toPandas()

fig = px.scatter_mapbox(airports_pd,
                        lat="Latitude",
                        lon="Longitude",
                        hover_name="Name",
                        hover_data=["City", "Country"],  # Display city and country on hover
                        zoom=1,
                        height=600)

fig.update_layout(mapbox_style="open-street-map",
                  margin={"r": 0, "t": 0, "l": 0, "b": 0})  # Remove margins

fig.update_traces(text="Name",  # Display airport names as text
                  textposition="bottom center")  # Position text below markers

pio.renderers.default = "colab"
fig.show()



AttributeError: 'NoneType' object has no attribute 'setCallSite'