<a href="https://colab.research.google.com/github/gregorylira/learning-pyspark/blob/main/Fundamentos_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## What is Spark?

Spark is a platform for cluster computing. Spark allows you to spread data and perform calculations in clusters with several nodes (in other words, as if there were several different computers doing the calculations). Splitting your data makes working with very large data sets easier because each node only works with a small amount of data.

Deciding whether or not Spark is the best solution for your problem requires some experience, but you can consider questions such as:

- Is my data too big to run on a single machine?
- Can my calculations be easily parallelized?

install pyspark on colab

In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=bdfce189ec36dad7a85704550203887fbbe438a7a91e687e6497a13a265eeab0
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


mounting the drive to get all data csvs

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
# creating a local session of pyspark
spark = (
    SparkSession.builder
    .master('local') # create a local instance
    .appName("learning_pyspark_01")
    .getOrCreate()
    )

## Dataframe

Spark's main data structure is the Resilient Distributed Dataset (RDD). This is a low-level object that allows Spark to work its magic by splitting data across multiple nodes in the cluster. However, RDDs are difficult to work with directly.

in this case i using Spark DataFrame

The Spark DataFrame is designed to behave much like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, but DataFrames are also more optimized for complicated operations than RDDs.

When you start modifying and combining columns and rows of data, there are many ways to achieve the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the correct way to optimize the query, but the DataFrame implementation has a lot of this optimization built in

To start working with Spark DataFrames, you first need to create a SparkSession object from your SparkContext. You can think of SparkContext as your connection to the cluster and SparkSession as your interface to that connection.

## how visualization table

Your SparkSession has an attribute called catalog that lists all the data within the cluster. This attribute has some methods to extract different information.

One of the most useful is the .listTables() method, which returns the names of all tables in your cluster as a list.

In [None]:
print(spark.catalog.listTables())
#in this case, the list is empty because i not load a dataframe in cluster

[]


## Import table and make query

One of the advantages of the DataFrame interface is that you can run SQL queries against the tables in your Spark cluster

in this session i loading a flight.csv. This table contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

In [None]:
flight_path = "./drive/MyDrive/learning/spark/flights_small.csv"
flights = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(flight_path)
# use inferSchema to infer type of column, if wrong use: withColumn("NewNameColumn", col("ColumnWrongType").cast("integer")).drop("ColumnWrongType")

In [None]:
print((flights.count(), len(flights.columns))) # shape of Dataframe

(10000, 16)


In [None]:
flights.show(10) # show 10 first lines of Dataframe in pandas this function is head(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
flights.printSchema() # show infos of each columns

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [None]:
from pyspark.sql.functions import col

In [None]:
flights = flights.\
        withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time") # this column has wrong type, using cast to tranform in type int

# but why create another column ?, in the spark the dataframe is imutable,
#in this query "changes" are assigned to the flights variable, creating another corrected Dataframe

In [None]:
flights = flights.withColumnRenamed("new_air_time","air_time") # rechange column new name to old name

In [None]:
flights.createOrReplaceTempView("flights") # register the dataframe in the temp view to using SQL or SparkSql and this dataframe now is visible in catalog

query = "FROM flights SELECT * LIMIT 10" # query sql

flights10 = spark.sql(query)

flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

Temporary views (like the ones we created above) in Spark SQL are session scoped and will disappear if the session that created them is terminated. If you want to have a temporary view shared across all sessions and keep it active until the Spark application is terminated, you can create a global temporary view. The global temp view is linked to a system-preserved global_temp database, and we must use the qualified name to reference it.

In [None]:
flights.createGlobalTempView("flights")

spark.sql("SELECT * FROM global_temp.flights LIMIT 10").show()