# Chapter 7 Bilingual PySpark: Blending Python and SQL code

This chapter covers

- Drawing a parallel between PySpark’s instruction set and the SQL vocabulary
- Registering data frames as temporary views or tables to query them using Spark SQL
- Using the catalog to create, reference, and delete registered tables for SQL querying
- Translating common data manipulations instructions from Python to SQL, and vice versa
- Using SQL-style clauses inside certain PySpark methods

# Start a pyspark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import helper_functions
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import numpy as np
warnings.filterwarnings("ignore")

# change the account name to your email account
account='sli'

# define a root path to access the data in the DataAnalysisWithPythonAndPySpark
root_path='/net/clusterhn/home/'+account+'/isa460/data/'

# check if the Spark session is active. If it is activate, close it

try:
    if spark:
        spark.stop()
except:
    pass    

spark = (SparkSession.builder.appName("Bilingual PySpark: Blending Python and SQL code")
        .config("spark.port.maxRetries", "100")
        .getOrCreate())

# confiture the log level (defaulty is WARN)
spark.sparkContext.setLogLevel('ERROR')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/19 16:47:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port 4046.
25/10/19 16:47:07 WARN Utils: Service 'SparkUI' could not bind on port 4046. Attempting port 4047.
25/10/19 16:47:07 WARN Utils: Serv

In [2]:
from helper_functions import displayByGroup

elements = spark.read.csv(
    root_path+"elements/Periodic_Table_Of_Elements.csv",
    header=True,
    inferSchema=True)

displayByGroup(elements, 5)

+------------+---------+------+----------+----------------+---------------+
|AtomicNumber|Element  |Symbol|AtomicMass|NumberofNeutrons|NumberofProtons|
+------------+---------+------+----------+----------------+---------------+
|1           |Hydrogen |H     |1.007     |0               |1              |
|2           |Helium   |He    |4.002     |2               |2              |
|3           |Lithium  |Li    |6.941     |4               |3              |
|4           |Beryllium|Be    |9.012     |5               |4              |
|5           |Boron    |B     |10.811    |6               |5              |
+------------+---------+------+----------+----------------+---------------+
only showing top 5 rows

+-----------------+------+-----+-----+-----------+-------+
|NumberofElectrons|Period|Group|Phase|Radioactive|Natural|
+-----------------+------+-----+-----+-----------+-------+
|1                |1     |1    |gas  |NULL       |yes    |
|2                |1     |18   |gas  |NULL       |yes  

# Register a data frame as a SQL table

In [3]:
elements.createOrReplaceTempView("elements_t")

In [4]:
query="select period, count(*) from elements_t where phase='liq' group by period"
spark.sql(query).show(5)

+------+--------+
|period|count(1)|
+------+--------+
|     6|       1|
|     4|       1|
+------+--------+



## Using the Spark catalog to manage SQL table/view

In [5]:
spark.catalog

<pyspark.sql.catalog.Catalog at 0x7f8d7ec24740>

In [6]:
# list all sql tables
spark.catalog.listTables()

[Table(name='elements_t', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [7]:
# drop sql table/view

spark.catalog.dropTempView("elements_t")

True

# Using SQL-style expressions in PySpark

we will use a public data set provided by Backblaze, which provided hard-drive data and statistics. Backblaze is a company that provides cloud storage and backup. Since 2013, they have provided data on the drives in their data center, and over time have moved to a focus on failures and diagnosis. To get the files, you can download them from the website (http://mng.bz/4jZa). We will use the latest data in June 2025.

In [17]:
df=spark.read.csv('/opt/shared/backblaze/', header=True, inferSchema=True)

                                                                                

In [18]:
df.count()

9618781

## Identify the model having the highest failure rate.

In [19]:
# select only the useful columns for our query
selected_columns=['date',
 'serial_number',
 'model',
 'capacity_bytes',
 'failure']


df1=df.select(selected_columns)

df1.show(10, False)

+----------+--------------+----------------------------------+--------------+-------+
|date      |serial_number |model                             |capacity_bytes|failure|
+----------+--------------+----------------------------------+--------------+-------+
|2025-06-13|2207E60CC65A  |CT250MX500SSD1                    |250059350016  |0      |
|2025-06-13|2340E87B92B5  |CT250MX500SSD1                    |250059350016  |0      |
|2025-06-13|2340E87B97E8  |CT250MX500SSD1                    |250059350016  |0      |
|2025-06-13|2EGK64VX      |HGST HUH728080ALE604              |8001563222016 |0      |
|2025-06-13|2EHZAKAX      |HGST HUH728080ALE604              |8001563222016 |0      |
|2025-06-13|2EJ02A1X      |HGST HUH728080ALE604              |8001563222016 |0      |
|2025-06-13|7LZ021LA      |Seagate BarraCuda SSD ZA250CM10002|250059350016  |0      |
|2025-06-13|S2ZYJ9CF511681|ST500LM012 HN                     |500107862016  |0      |
|2025-06-13|S2ZYJ9GGB01000|ST500LM012 HN              

In [20]:
# get driver capacity in gigabytes

df2=df1.selectExpr("model", "capacity_bytes/power(1024,3) capacity_GB", "date", "failure")

df2.show()

+--------------------+------------------+----------+-------+
|               model|       capacity_GB|      date|failure|
+--------------------+------------------+----------+-------+
|      CT250MX500SSD1|232.88591766357422|2025-06-13|      0|
|      CT250MX500SSD1|232.88591766357422|2025-06-13|      0|
|      CT250MX500SSD1|232.88591766357422|2025-06-13|      0|
|HGST HUH728080ALE604| 7452.036460876465|2025-06-13|      0|
|HGST HUH728080ALE604| 7452.036460876465|2025-06-13|      0|
|HGST HUH728080ALE604| 7452.036460876465|2025-06-13|      0|
|Seagate BarraCuda...|232.88591766357422|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 465.7617416381836|2025-06-13|      0|
|       ST500LM012 HN| 4

In [21]:
# calcualte failure rate by model

df3=df2.filter(F.col('failure').isNotNull()).groupBy('model', 'capacity_GB', 'failure').count().orderBy('model', 'capacity_GB')

df4=df3.groupBy('model', 'capacity_GB').pivot('failure').sum('count')

df3.show()

df4.show()

                                                                                

+--------------------+------------------+-------+------+
|               model|       capacity_GB|failure| count|
+--------------------+------------------+-------+------+
|      CT250MX500SSD1|232.88591766357422|      0| 21117|
|         DELLBOSS VD|447.06915283203125|      0| 12929|
|HGST HMS5C4040ALE640| 3726.023277282715|      0|   330|
|HGST HMS5C4040BLE640| 3726.023277282715|      0|  5612|
|HGST HMS5C4040BLE640| 3726.023277282715|      1|     1|
|HGST HUH721010ALE600|            9314.0|      0|   598|
|HGST HUH721212ALE600|           11176.0|      0| 78180|
|HGST HUH721212ALE604|           11176.0|      1|    31|
|HGST HUH721212ALE604|           11176.0|      0|400039|
|HGST HUH721212ALN604|           11176.0|      0|299258|
|HGST HUH721212ALN604|           11176.0|      1|    30|
|HGST HUH728080ALE600| 7452.036460876465|      0| 32316|
|HGST HUH728080ALE604| 7452.036460876465|      0|  2370|
|HGST HUS728T8TALE6L4| 7452.036460876465|      0|   570|
|       MTFDDAV240TCB|223.57088



+--------------------+------------------+-------+----+
|               model|       capacity_GB|      0|   1|
+--------------------+------------------+-------+----+
|       ST12000NM0117|           11176.0|    210|NULL|
|      WDC WD5000LPCX| 465.7617416381836|   1440|NULL|
|       MTFDDAV240TCB|223.57088470458984|   2580|NULL|
|       MTFDDAV480TCB|447.13167572021484|     60|NULL|
|       ST500LM012 HN| 465.7617416381836|   2165|NULL|
|       ST12000NM0008|           11176.0| 566560|  46|
|Seagate FireCuda ...| 465.7617416381836|    180|NULL|
|     WUH721816ALE6L4|           14902.0|   3818|NULL|
|       ST24000NM002H|           22352.0| 165035|  12|
|Samsung SSD 850 E...| 931.5133895874023|     60|NULL|
|       ST16000NM005G|           14902.0|    750|NULL|
|     WDC WDS250G2B0A|232.88591766357422|   5004|NULL|
| TOSHIBA MG10ACA20TE|           18627.0| 431902|   8|
|       ST14000NM000J|           13039.0|   8983|   1|
| TOSHIBA MG08ACA16TA|           14902.0|1203454|  43|
|       ST

                                                                                

In [22]:
df5=df4.fillna(0).withColumn('failure_rate', F.col('1')/(F.col('0')+F.col('1'))).orderBy(F.desc('failure_rate'))

df5.show()



+--------------------+-----------------+-------+---+--------------------+
|               model|      capacity_GB|      0|  1|        failure_rate|
+--------------------+-----------------+-------+---+--------------------+
| TOSHIBA MG09ACA16TE|          14902.0|   4705|  1|2.124946876328091...|
|HGST HMS5C4040BLE640|3726.023277282715|   5612|  1|1.781578478531979...|
|       ST14000NM0138|          13039.0|  38690|  6|1.550547860243953E-4|
|       ST12000NM0007|          11176.0|  30217|  4|1.323582939015916E-4|
|       ST14000NM000J|          13039.0|   8983|  1|1.113089937666963...|
|HGST HUH721212ALN604|          11176.0| 299258| 30|1.002378979444548...|
|       ST10000NM0086|           9314.0|  30301|  3|9.899683210137276E-5|
|TOSHIBA MG08ACA16TEY|          14902.0| 153743| 14|9.105276507736233E-5|
|       ST12000NM0008|          11176.0| 566560| 46|8.118516217618592E-5|
|HGST HUH721212ALE604|          11176.0| 400039| 31|7.748643987302223E-5|
|TOSHIBA MG07ACA14TEY|          13039.

                                                                                

In [23]:
# is there a correlation between capacity and failure_rate

df5.select(F.corr('capacity_GB', 'failure_rate')).show()



+-------------------------------+
|corr(capacity_GB, failure_rate)|
+-------------------------------+
|             0.3752010764071598|
+-------------------------------+



                                                                                

## SQL and Pyspark code comparison

In [24]:
df=spark.read.csv('/opt/shared/backblaze/', header=True, inferSchema=True)

selected_columns=['date',
 'serial_number',
 'model',
 'capacity_bytes',
 'failure']


backblaze=df.select(selected_columns)

backblaze.createOrReplaceTempView("backblaze_t")

                                                                                

### display the top 3 models that have the highest max capacity


In [29]:
# SQL way
spark.sql(
    """SELECT
           model,
           min(capacity_bytes / pow(1024, 3)) min_GB,
           max(capacity_bytes/ pow(1024, 3)) max_GB
        FROM backblaze_t
        GROUP BY 1
        ORDER BY 3 DESC"""
).show(5)



+--------------------+-------+-------+
|               model| min_GB| max_GB|
+--------------------+-------+-------+
|       ST24000NM002H|22352.0|22352.0|
| WDC WUH722222ALE6L4|20490.0|20490.0|
| TOSHIBA MG10ACA20TE|18627.0|18627.0|
|       ST18000NM000J|16764.0|16764.0|
|TOSHIBA MG08ACA16TEY|14902.0|14902.0|
+--------------------+-------+-------+
only showing top 5 rows



                                                                                

In [32]:
#Pyspark way

backblaze.groupby(F.col("model")).agg(
    F.min(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("min_GB"),
    F.max(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("max_GB"),
).orderBy(F.col("max_GB"), ascending=False).show(5) 



+--------------------+-------+-------+
|               model| min_GB| max_GB|
+--------------------+-------+-------+
|       ST24000NM002H|22352.0|22352.0|
| WDC WUH722222ALE6L4|20490.0|20490.0|
| TOSHIBA MG10ACA20TE|18627.0|18627.0|
|       ST18000NM000J|16764.0|16764.0|
|TOSHIBA MG08ACA16TEY|14902.0|14902.0|
+--------------------+-------+-------+
only showing top 5 rows



                                                                                

### display the top 3 models that have the highest max capacity, remove model that min capacity is equal to max capacity

filtering after grouping using having

In [33]:
# SQL way
spark.sql(
    """SELECT
           model,
           min(capacity_bytes / pow(1024, 3)) min_GB,
           max(capacity_bytes/ pow(1024, 3)) max_GB
        FROM backblaze_t
        GROUP BY 1
        HAVING min_GB!=max_GB
        ORDER BY 3 DESC"""
).show(5)



+-----+------+------+
|model|min_GB|max_GB|
+-----+------+------+
+-----+------+------+



                                                                                

In [34]:
#Pyspark way

backblaze.groupby(F.col("model")).agg(
    F.min(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("min_GB"),
    F.max(F.col("capacity_bytes") / F.pow(F.lit(1024), 3)).alias("max_GB"),
).where(F.col("min_GB") != F.col("max_GB")).orderBy(F.col("max_GB"), ascending=False).show(5) 



+-----+------+------+
|model|min_GB|max_GB|
+-----+------+------+
+-----+------+------+



                                                                                

## Creating new tables/views using the CREATE keyword

In [35]:
spark.sql(
    """
    CREATE OR REPLACE TEMP VIEW drive_days AS
        SELECT model, count(*) AS drive_days
        FROM backblaze_t
        GROUP BY model"""
)
 
spark.sql(
    """CREATE OR REPLACE TEMP VIEW failures AS
           SELECT model, count(*) AS failures
           FROM backblaze_t
           WHERE failure = 1
           GROUP BY model"""
)

DataFrame[]

In [36]:
spark.catalog.listTables()

[Table(name='backblaze_t', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='drive_days', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='failures', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

### Adding data to our table using UNION and JOIN

In [37]:
backblaze_failure=backblaze.filter(F.col('failure')==1)
backblaze_working=backblaze.filter(F.col('failure')==0)

backblaze_failure.createOrReplaceTempView('backblaze_failure_t')
backblaze_working.createOrReplaceTempView('backblaze_working_t')

In [38]:
# sql way
spark.sql("""
        create or replace temp view backblaze_complete as
        select * from backblaze_failure_t 
        union all
        select * from backblaze_working_t
""")

#pyspark way

backblaze_complet=backblaze_failure.union(backblaze_working)

In [39]:
# left join
spark.sql(
    """select
           drive_days.model,
           drive_days,
           failures
    from drive_days
    left join failures on drive_days.model = failures.model"""
).show(5)



+--------------------+----------+--------+
|               model|drive_days|failures|
+--------------------+----------+--------+
|      SSDSCKKB240GZR|        30|    NULL|
|Seagate FireCuda ...|       180|    NULL|
|         ST8000DM005|       720|    NULL|
|       ST12000NM0007|     30221|       4|
| TOSHIBA MQ01ABF050M|      3264|    NULL|
+--------------------+----------+--------+
only showing top 5 rows



                                                                                

### Organizing your SQL code better through subqueries and common table expressions

In [40]:
spark.sql("select * from backblaze_t").show()

+----------+--------------+--------------------+--------------+-------+
|      date| serial_number|               model|capacity_bytes|failure|
+----------+--------------+--------------------+--------------+-------+
|2025-06-13|  2207E60CC65A|      CT250MX500SSD1|  250059350016|      0|
|2025-06-13|  2340E87B92B5|      CT250MX500SSD1|  250059350016|      0|
|2025-06-13|  2340E87B97E8|      CT250MX500SSD1|  250059350016|      0|
|2025-06-13|      2EGK64VX|HGST HUH728080ALE604| 8001563222016|      0|
|2025-06-13|      2EHZAKAX|HGST HUH728080ALE604| 8001563222016|      0|
|2025-06-13|      2EJ02A1X|HGST HUH728080ALE604| 8001563222016|      0|
|2025-06-13|      7LZ021LA|Seagate BarraCuda...|  250059350016|      0|
|2025-06-13|S2ZYJ9CF511681|       ST500LM012 HN|  500107862016|      0|
|2025-06-13|S2ZYJ9GGB01000|       ST500LM012 HN|  500107862016|      0|
|2025-06-13|S2ZYJ9GGB01001|       ST500LM012 HN|  500107862016|      0|
|2025-06-13|S2ZYJ9GGB01021|       ST500LM012 HN|  500107862016| 

In [41]:
# Finding drive models with highest failure rates using subqueries

spark.sql(
    """
    SELECT
        failures.model,
        failures / drive_days failure_rate
    FROM (
        SELECT
            model,
            count(*) AS drive_days
        FROM backblaze_t
        GROUP BY model) drive_days
    INNER JOIN (
        SELECT
            model,
            count(*) AS failures
        FROM backblaze_t
        WHERE failure = 1
        GROUP BY model) failures
    ON
        drive_days.model = failures.model
    ORDER BY 2 desc
    """
).show(5)



+--------------------+--------------------+
|               model|        failure_rate|
+--------------------+--------------------+
| TOSHIBA MG09ACA16TE|2.124946876328091...|
|HGST HMS5C4040BLE640|1.781578478531979...|
|       ST14000NM0138|1.550547860243953E-4|
|       ST12000NM0007|1.323582939015916E-4|
|       ST14000NM000J|1.113089937666963...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [42]:
# Finding highest failure rates using common table expressions/CTE
spark.sql(
    """
    WITH drive_days as (            
        SELECT                       
            model,                   
            count(*) AS drive_days   
        FROM backblaze_t            
        GROUP BY model),             
    failures as (                    
        SELECT                      
            model,                   
            count(*) AS failures    
        FROM backblaze_t            
        WHERE failure = 1           
        GROUP BY model)              
    SELECT
        failures.model,
        failures / drive_days failure_rate
    FROM drive_days
    INNER JOIN failures
    ON
        drive_days.model = failures.model
    ORDER BY 2 desc
    """
).show(5)




+--------------------+--------------------+
|               model|        failure_rate|
+--------------------+--------------------+
| TOSHIBA MG09ACA16TE|2.124946876328091...|
|HGST HMS5C4040BLE640|1.781578478531979...|
|       ST14000NM0138|1.550547860243953E-4|
|       ST12000NM0007|1.323582939015916E-4|
|       ST14000NM000J|1.113089937666963...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [43]:
# Finding the highest failure rate using Python scope rules

def failure_rate(df):
    drive_days = df.groupby(F.col("model")).agg(   
        F.count(F.col("*")).alias("drive_days")
    )
    failures = (
       df.where(F.col("failure") == 1)
        .groupby(F.col("model"))
        .agg(F.count(F.col("*")).alias("failures"))
    )
    answer = (                                               
        drive_days.join(failures, on="model", how="inner")
        .withColumn("failure_rate", F.col("failures") / F.col("drive_days"))
        .orderBy(F.col("failure_rate").desc())
    )
    return answer

In [44]:
failure_rate(backblaze).show()



+--------------------+----------+--------+--------------------+
|               model|drive_days|failures|        failure_rate|
+--------------------+----------+--------+--------------------+
| TOSHIBA MG09ACA16TE|      4706|       1|2.124946876328091...|
|HGST HMS5C4040BLE640|      5613|       1|1.781578478531979...|
|       ST14000NM0138|     38696|       6|1.550547860243953E-4|
|       ST12000NM0007|     30221|       4|1.323582939015916E-4|
|       ST14000NM000J|      8984|       1|1.113089937666963...|
|HGST HUH721212ALN604|    299288|      30|1.002378979444548...|
|       ST10000NM0086|     30304|       3|9.899683210137276E-5|
|TOSHIBA MG08ACA16TEY|    153757|      14|9.105276507736233E-5|
|       ST12000NM0008|    566606|      46|8.118516217618592E-5|
|HGST HUH721212ALE604|    400070|      31|7.748643987302223E-5|
|TOSHIBA MG07ACA14TEY|     26821|       2|7.456843518138772E-5|
|       ST24000NM002H|    165047|      12| 7.27065623731422E-5|
|        ST8000NM0055|    399813|      2

                                                                                

In [45]:
def most_reliable_drive_for_capacity(data, capacity_GB=2048, precision=0.25, top_n=3):
    """Returns the top 3 drives for a given approximate capacity.
 
    Given a capacity in GB and a precision as a decimal number, we keep the N
    drives where:
 
    - the capacity is between (capacity * 1/(1+precision)), capacity * (1+precision)
    - the failure rate is the lowest
 
    """
    capacity_min = capacity_GB / (1 + precision)
    capacity_max = capacity_GB * (1 + precision)
 
    answer = (
        data.where(f"capacity_GB between {capacity_min} and {capacity_max}")
        .orderBy("failure_rate", "capacity_GB", ascending=[True, False])
        .limit(top_n)                                                     
     )
 
    return answer

In [46]:
backblaze1=backblaze.withColumn('capacity_GB', F.col("capacity_bytes") / F.pow(F.lit(1024), 3))

most_reliable_drive_for_capacity(df5, capacity_GB=11176.0).show()



+-------------+-----------+----+---+------------+
|        model|capacity_GB|   0|  1|failure_rate|
+-------------+-----------+----+---+------------+
|ST14000NM002J|    13039.0| 150|  0|         0.0|
|ST14000NM0018|    13039.0|1920|  0|         0.0|
|ST12000NM0117|    11176.0| 210|  0|         0.0|
+-------------+-----------+----+---+------------+



                                                                                

#Summary

- Spark provides a SQL API for data manipulation. This API supports ANSI SQL.
- Spark (and PySpark, by extension) borrows a lot of vocabulary and expected functionality from the way SQL manipulates tables. This is especially evident since the data manipulation module is called pyspark.sql.
- PySpark’s data frames need to be registered as views or tables before they can be queried with Spark SQL. You can give them a different name than the data frame you’re registering.
- PySpark’s own data frame manipulation methods and functions borrow SQL functionality, for the most part. Some exceptions, such as union(), are present and documented in the API.
- Spark SQL queries can be inserted in a PySpark program through the spark.sql function, where spark is the running SparkSession.
- Spark SQL table references are kept in a Catalog, which contains the metadata for all tables accessible to Spark SQL.
- PySpark will accept SQL-style clauses in where(), expr(), and selectExpr(), which can simplify the syntax for complex filtering and selection.
- When using Spark SQL queries with user-provided input, be careful with sanitizing the inputs to avoid potential SQL injection attacks.

## In class exercise

display average failure rate of driver by day, order by day, and visualize the result.

Use both SQL and pyspark to get the answer. Which one you prefer?

In [52]:
df=spark.read.csv('/opt/shared/backblaze/', header=True, inferSchema=True)

selected_columns=['date',
 'serial_number',
 'model',
 'capacity_bytes',
 'failure']


backblaze=df.select(selected_columns)

backblaze.createOrReplaceTempView("backblaze_t")

                                                                                

## PySpark way

## SQL way

## visualize the result