In [0]:
# Lendo o arquivo de dados
display(dbutils.fs.ls("/databricks-datasets"))

path,name,size,modificationTime
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0


In [0]:
# lendo o arquivo de dados
arquivo = "dbfs:/databricks-datasets/flights/"

In [0]:
# Lendo arquivo csv, com cabeçalho e o infraschema sendo definido pelo framework
# inferSchema: Define os tipo de dados para as colunas
df = spark \
.read.format("csv") \
.option("inferSchema","True")\
.option("header","True")\
.csv(arquivo)

In [0]:
# Imprime os datatypes das colunas do dataframe
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
# imprime o tipo da variável df
type(df)

Out[6]: pyspark.sql.dataframe.DataFrame

In [0]:
# Retorna as primeiras 5 linhas do dataframe em formato array
df.take(5)

Out[7]: [Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL'),
 Row(date='01020600', delay='-8', distance='369', origin='ABE', destination='DTW'),
 Row(date='01021245', delay='-2', distance='602', origin='ABE', destination='ATL'),
 Row(date='01020605', delay='-4', distance='602', origin='ABE', destination='ATL'),
 Row(date='01031245', delay='-4', distance='602', origin='ABE', destination='ATL')]

In [0]:
# Imprime a quantidade de linhas no dataframe
df.count()

Out[8]: 1392106

In [0]:
# lendo o arquivo previamente com a opção InfraShema off
# Para isso, deverá passar o parâmetro schema e definir o tipo de dados para cada coluna que será trabalhado
df = spark \
.read.format("csv") \
.option("inferSchema","False")\
.option("header","True")\
.csv(arquivo)

In [0]:
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

Out[11]: [Row(max(delay)='995')]

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("delay<2").show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 2 rows



In [0]:
# usando where (um alias para o metodo filter)
df.where("delay<2").show(2)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 2 rows



In [0]:
# Ordena o DataFrame pela coluna delay
df.sort("delay").show(5)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
+--------------------+-----+--------+------+-----------+
only showing top 5 rows



In [0]:
# Ordenando a coluna delay na forma decrescente
from pyspark.sql.functions import asc,desc,expr

df.orderBy(expr("delay desc")).show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# visualizando estatísticas descritivas
df.describe().show()

+-------+--------------------+--------------------+--------------------+-------+-----------+
|summary|                date|               delay|            distance| origin|destination|
+-------+--------------------+--------------------+--------------------+-------+-----------+
|  count|             1392106|             1391580|             1391579|1391578|    1391578|
|   mean|   2180446.584000322|  12.079802928761449|   690.5508264718184|   null|       null|
| stddev|   838031.1536741031|   38.80773374985648|    513.662815366331|   null|       null|
|    min|"Cap-aux-Meules, ...| airline and rout...| dataset can be f...|    ABE|        ABE|
|    max|  Yuma\tAZ\tUSA\tYUM|                 995|                 999|    YUM|        YUM|
+-------+--------------------+--------------------+--------------------+-------+-----------+



In [0]:
# interando sobre todas as linhas do dataframe
for i in df.collect():
    print(i)
    print(i[0],i[1])

Row(date='01011245', delay='6', distance='602', origin='ABE', destination='ATL')
01011245 6
Row(date='01020600', delay='-8', distance='369', origin='ABE', destination='DTW')
01020600 -8
Row(date='01021245', delay='-2', distance='602', origin='ABE', destination='ATL')
01021245 -2
Row(date='01020605', delay='-4', distance='602', origin='ABE', destination='ATL')
01020605 -4
Row(date='01031245', delay='-4', distance='602', origin='ABE', destination='ATL')
01031245 -4
Row(date='01030605', delay='0', distance='602', origin='ABE', destination='ATL')
01030605 0
Row(date='01041243', delay='10', distance='602', origin='ABE', destination='ATL')
01041243 10
Row(date='01040605', delay='28', distance='602', origin='ABE', destination='ATL')
01040605 28
Row(date='01051245', delay='88', distance='602', origin='ABE', destination='ATL')
01051245 88
Row(date='01050605', delay='9', distance='602', origin='ABE', destination='ATL')
01050605 9
Row(date='01061215', delay='-6', distance='602', origin='ABE', des

In [0]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna',df['delay']+2)
df.show(10)

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [0]:
# Renomeando uma coluna no dataframe
df.withColumnRenamed('Nova Coluna','New Column').show()


+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# Remove coluna
df = df.drop('Nova Coluna')
df.show(10)

+--------+-----+--------+------+-----------+----------+
|    date|delay|distance|origin|destination|New Column|
+--------+-----+--------+------+-----------+----------+
|01011245|    6|     602|   ABE|        ATL|       8.0|
|01020600|   -8|     369|   ABE|        DTW|      -6.0|
|01021245|   -2|     602|   ABE|        ATL|       0.0|
|01020605|   -4|     602|   ABE|        ATL|      -2.0|
|01031245|   -4|     602|   ABE|        ATL|      -2.0|
|01030605|    0|     602|   ABE|        ATL|       2.0|
|01041243|   10|     602|   ABE|        ATL|      12.0|
|01040605|   28|     602|   ABE|        ATL|      30.0|
|01051245|   88|     602|   ABE|        ATL|      90.0|
|01050605|    9|     602|   ABE|        ATL|      11.0|
|01061215|   -6|     602|   ABE|        ATL|      -4.0|
|01061725|   69|     602|   ABE|        ATL|      71.0|
|01061230|    0|     369|   ABE|        DTW|       2.0|
|01060625|   -3|     602|   ABE|        ATL|      -1.0|
|01070600|    0|     369|   ABE|        DTW|    

In [0]:
df.filter("delay is NULL").show()

+--------------------+-----+--------+------+-----------+-----------+
|                date|delay|distance|origin|destination|Nova Coluna|
+--------------------+-----+--------+------+-----------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|       null|
|Alliance\tNE\tUSA...| null|    null|  null|       null|       null|
|Alpena\tMI\tUSA\tAPN| null|    nu

In [0]:
df.filter(df.delay.isNull()).show(10)

+--------------------+-----+--------+------+-----------+-----------+
|                date|delay|distance|origin|destination|Nova Coluna|
+--------------------+-----+--------+------+-----------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|       null|
+--------------------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [0]:
# Preenchendo os dados missing com valor 0
df.na.fill(value=0).show()
# df.na.fill("").show()

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
|01061215|   -6|     602|   ABE|        ATL|       -4.0|
|01061725|   69|     602|   ABE|        ATL|       71.0|
|01061230|    0|     369|   ABE|        DTW|        2.0|
|01060625|   -3|     602|   ABE|        ATL|       -1.0|
|01070600|    0|     369|   ABE

In [0]:
# Preenchendo os dados missing com valor 0 apenas da coluna delay
df.na.fill(value=0,subset=['delay']).show()

In [0]:
# remove qualquer linha nula de qualquer coluna
df.na.drop().show()

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
|01061215|   -6|     602|   ABE|        ATL|       -4.0|
|01061725|   69|     602|   ABE|        ATL|       71.0|
|01061230|    0|     369|   ABE|        DTW|        2.0|
|01060625|   -3|     602|   ABE|        ATL|       -1.0|
|01070600|    0|     369|   ABE