**Instalar sessão**

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [14]:
# bibliotecas importantes
import pyspark.sql.functions as SF

# **Leitura**

In [3]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

In [None]:
# carregar dados de um csv
df_spark = sc.read.csv("/content/worldcities.csv")

# ver algumas informações sobre os tipos de dados de cada coluna
df_spark.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)



In [None]:
df_spark.show()

+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|         _c0|         _c1|     _c2|     _c3|          _c4| _c5| _c6|                 _c7|    _c8|       _c9|      _c10|
+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|        city|  city_ascii|     lat|     lng|      country|iso2|iso3|          admin_name|capital|population|        id|
|       Tokyo|       Tokyo| 35.6897|139.6922|        Japan|  JP| JPN|               Tōkyō|primary|  37977000|1392685764|
|     Jakarta|     Jakarta| -6.2146|106.8451|    Indonesia|  ID| IDN|             Jakarta|primary|  34540000|1360771077|
|       Delhi|       Delhi| 28.6600| 77.2300|        India|  IN| IND|               Delhi|  admin|  29617000|1356872604|
|      Mumbai|      Mumbai| 18.9667| 72.8333|        India|  IN| IND|         Mahārāshtra|  admin|  23355000|1356226629|
|      Manila|      Manila| 14.6

Usando header para setar os nomes das colunas corretamente

In [None]:
df_spark1 = sc.read.csv("/content/worldcities.csv", header=True)

In [None]:
df_spark1.show()

+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|        city|  city_ascii|     lat|     lng|      country|iso2|iso3|          admin_name|capital|population|        id|
+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|       Tokyo|       Tokyo| 35.6897|139.6922|        Japan|  JP| JPN|               Tōkyō|primary|  37977000|1392685764|
|     Jakarta|     Jakarta| -6.2146|106.8451|    Indonesia|  ID| IDN|             Jakarta|primary|  34540000|1360771077|
|       Delhi|       Delhi| 28.6600| 77.2300|        India|  IN| IND|               Delhi|  admin|  29617000|1356872604|
|      Mumbai|      Mumbai| 18.9667| 72.8333|        India|  IN| IND|         Mahārāshtra|  admin|  23355000|1356226629|
|      Manila|      Manila| 14.6000|120.9833|  Philippines|  PH| PHL|              Manila|primary|  23088000|1608618140|
|    Shanghai|    Shanghai| 31.1

In [None]:
df_spark1

DataFrame[city: string, city_ascii: string, lat: string, lng: string, country: string, iso2: string, iso3: string, admin_name: string, capital: string, population: string, id: string]

Como poder ver acima, todas as colunas são do tipo string, vamos ajeitar usando inferSchema

In [None]:
df_spark2 = sc.read.csv("/content/worldcities.csv", inferSchema=True, header=True) 

In [None]:
df_spark2

DataFrame[city: string, city_ascii: string, lat: double, lng: double, country: string, iso2: string, iso3: string, admin_name: string, capital: string, population: double, id: int]

In [None]:
df_spark2.show()

+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|        city|  city_ascii|     lat|     lng|      country|iso2|iso3|          admin_name|capital|population|        id|
+------------+------------+--------+--------+-------------+----+----+--------------------+-------+----------+----------+
|       Tokyo|       Tokyo| 35.6897|139.6922|        Japan|  JP| JPN|               Tōkyō|primary|  3.7977E7|1392685764|
|     Jakarta|     Jakarta| -6.2146|106.8451|    Indonesia|  ID| IDN|             Jakarta|primary|   3.454E7|1360771077|
|       Delhi|       Delhi|   28.66|   77.23|        India|  IN| IND|               Delhi|  admin|  2.9617E7|1356872604|
|      Mumbai|      Mumbai| 18.9667| 72.8333|        India|  IN| IND|         Mahārāshtra|  admin|  2.3355E7|1356226629|
|      Manila|      Manila|    14.6|120.9833|  Philippines|  PH| PHL|              Manila|primary|  2.3088E7|1608618140|
|    Shanghai|    Shanghai| 31.1

In [None]:
df_spark2.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_ascii: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- country: string (nullable = true)
 |-- iso2: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- admin_name: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- population: double (nullable = true)
 |-- id: integer (nullable = true)



In [None]:
type(df_spark2)

pyspark.sql.dataframe.DataFrame

# **Leitura: Modo 2 (mais usado)**

In [5]:
df_spark = (sc
            .read
            .format("csv")
            .option("header", "true")
            .option("inferSchema", "true")
            .load("/content/worldcities.csv")
            )

In [6]:
print(df_spark)

DataFrame[city: string, city_ascii: string, lat: double, lng: double, country: string, iso2: string, iso3: string, admin_name: string, capital: string, population: double, id: int]


In [7]:
df_spark.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_ascii: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- country: string (nullable = true)
 |-- iso2: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- admin_name: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- population: double (nullable = true)
 |-- id: integer (nullable = true)



In [8]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [10]:
df_spark.show(3)

+-------+----------+-------+--------+---------+----+----+----------+-------+----------+----------+
|   city|city_ascii|    lat|     lng|  country|iso2|iso3|admin_name|capital|population|        id|
+-------+----------+-------+--------+---------+----+----+----------+-------+----------+----------+
|  Tokyo|     Tokyo|35.6897|139.6922|    Japan|  JP| JPN|     Tōkyō|primary|  3.7977E7|1392685764|
|Jakarta|   Jakarta|-6.2146|106.8451|Indonesia|  ID| IDN|   Jakarta|primary|   3.454E7|1360771077|
|  Delhi|     Delhi|  28.66|   77.23|    India|  IN| IND|     Delhi|  admin|  2.9617E7|1356872604|
+-------+----------+-------+--------+---------+----+----+----------+-------+----------+----------+
only showing top 3 rows



In [12]:
df_spark.head(3)

[Row(city='Tokyo', city_ascii='Tokyo', lat=35.6897, lng=139.6922, country='Japan', iso2='JP', iso3='JPN', admin_name='Tōkyō', capital='primary', population=37977000.0, id=1392685764),
 Row(city='Jakarta', city_ascii='Jakarta', lat=-6.2146, lng=106.8451, country='Indonesia', iso2='ID', iso3='IDN', admin_name='Jakarta', capital='primary', population=34540000.0, id=1360771077),
 Row(city='Delhi', city_ascii='Delhi', lat=28.66, lng=77.23, country='India', iso2='IN', iso3='IND', admin_name='Delhi', capital='admin', population=29617000.0, id=1356872604)]

# **Select**

In [13]:
df_spark.select("city", "country", "population").show(5)

+-------+-----------+----------+
|   city|    country|population|
+-------+-----------+----------+
|  Tokyo|      Japan|  3.7977E7|
|Jakarta|  Indonesia|   3.454E7|
|  Delhi|      India|  2.9617E7|
| Mumbai|      India|  2.3355E7|
| Manila|Philippines|  2.3088E7|
+-------+-----------+----------+
only showing top 5 rows



In [15]:
# mostra os comandos que podem ser realizados no df
dir(df_spark)

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_collectAsArrow',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_repr_html_',
 '_sc',
 '_schema',
 '_sort_cols',
 '_support_repr_html',
 'agg',
 'alias',
 'approxQuantile',
 'cache',
 'checkpoint',
 'coalesce',
 'colRegex',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'createGlobalTempView',
 'createOrReplaceGlobalTempView',
 'createOrReplaceTempView',
 'createTempView',
 'crossJoin',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'exceptAll',
 'explain',
 'fillna',
 'filter',
 'first',
 'foreach',
 'f

In [18]:
# mostra os comandos que podem ser realizados em uma coluna
dir(SF.col("city"))

['__add__',
 '__and__',
 '__bool__',
 '__class__',
 '__contains__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__div__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__invert__',
 '__iter__',
 '__le__',
 '__lt__',
 '__mod__',
 '__module__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__nonzero__',
 '__or__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__rdiv__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__rmod__',
 '__rmul__',
 '__ror__',
 '__rpow__',
 '__rsub__',
 '__rtruediv__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__sub__',
 '__subclasshook__',
 '__truediv__',
 '__weakref__',
 '_asc_doc',
 '_asc_nulls_first_doc',
 '_asc_nulls_last_doc',
 '_bitwiseAND_doc',
 '_bitwiseOR_doc',
 '_bitwiseXOR_doc',
 '_contains_doc',
 '_desc_doc',
 '_desc_nulls_first_doc',
 '_desc_nulls_last_doc',
 '_endswith_doc',
 '_eqNullSafe_doc',
 '_isNotNull_doc',
 '_isNull_doc',
 '_j

**Fazendo uma seleção específica**

In [33]:
# - as duas linhas de código comentadas funcionam da mesma forma de como a ultima linha representa

#filtro_df = df_spark.select(SF.col("city"), SF.col("country"), SF.col("population")).filter(SF.col("population") < 10000)
#filtro_df = df_spark.select(SF.col("city"), SF.col("country"), SF.col("population")).filter("population < 10000")
filtro_df = df_spark.select(SF.col("city"), SF.col("country"), SF.col("population")).filter(df_spark.population < 10000)

In [34]:
filtro_df.show(5)

+----------+--------------------+----------+
|      city|             country|population|
+----------+--------------------+----------+
|  Valletta|               Malta|    6444.0|
|Grand Turk|Turks And Caicos ...|    5801.0|
|   Palikir|Micronesia, Feder...|    6227.0|
|  Funafuti|              Tuvalu|    6025.0|
|   Lobamba|           Swaziland|    5800.0|
+----------+--------------------+----------+
only showing top 5 rows



*Outra forma de realizar a operação acima*

In [35]:
colunas = ["city", "country", "population"]
filtro = SF.col("population") < 10000

df_spark.select(*colunas).filter(filtro).show(5)

+----------+--------------------+----------+
|      city|             country|population|
+----------+--------------------+----------+
|  Valletta|               Malta|    6444.0|
|Grand Turk|Turks And Caicos ...|    5801.0|
|   Palikir|Micronesia, Feder...|    6227.0|
|  Funafuti|              Tuvalu|    6025.0|
|   Lobamba|           Swaziland|    5800.0|
+----------+--------------------+----------+
only showing top 5 rows



**Aplicando mais de um filtro**

In [45]:
# selecionando todas as colunas dessa vez, então não vou usar o select
#df_spark.filter(df_spark.population < 10000).filter(df_spark.country == "India").show(5)
df_spark.filter((SF.col("population") < 10000) & (SF.col("country") == "India")).show(5) # pode-se usar where ao inves de filter

+----------------+----------------+-------+-------+-------+----+----+----------------+-------+----------+----------+
|            city|      city_ascii|    lat|    lng|country|iso2|iso3|      admin_name|capital|population|        id|
+----------------+----------------+-------+-------+-------+----+----+----------------+-------+----------+----------+
|       Mahadipur|       Mahadipur|24.8566|88.1248|  India|  IN| IND|     West Bengal|   null|    8638.0|1356139009|
|     Drākshārāma|     Draksharama|16.7928|82.0635|  India|  IN| IND|  Andhra Pradesh|   null|    9299.0|1356061765|
|         Chikhli|         Chikhli|  20.75|  73.07|  India|  IN| IND|         Gujarāt|   null|    6953.0|1356649674|
|Niāla Kondapalle|Niala Kondapalle|   17.1|80.0506|  India|  IN| IND|       Telangana|   null|    7767.0|1356698261|
|          Manāli|          Manali|32.2044|  77.17|  India|  IN| IND|Himāchal Pradesh|   null|    8096.0|1356961623|
+----------------+----------------+-------+-------+-------+----+

In [48]:
df_spark.filter((SF.col("population") < 10000) | (SF.col("country") == "India")).show(8) # pode-se usar where ao inves de filter

+---------+----------+-------+-------+-------+----+----+-----------+-------+----------+----------+
|     city|city_ascii|    lat|    lng|country|iso2|iso3| admin_name|capital|population|        id|
+---------+----------+-------+-------+-------+----+----+-----------+-------+----------+----------+
|    Delhi|     Delhi|  28.66|  77.23|  India|  IN| IND|      Delhi|  admin|  2.9617E7|1356872604|
|   Mumbai|    Mumbai|18.9667|72.8333|  India|  IN| IND|Mahārāshtra|  admin|  2.3355E7|1356226629|
|  Kolkāta|   Kolkata|22.5411|88.3378|  India|  IN| IND|West Bengal|  admin|   1.756E7|1356060520|
|Bangalore| Bangalore|12.9699| 77.598|  India|  IN| IND|  Karnātaka|  admin|  1.3707E7|1356410365|
|  Chennai|   Chennai|13.0825| 80.275|  India|  IN| IND| Tamil Nādu|  admin|  1.1324E7|1356374944|
|Hyderābād| Hyderabad|17.3667|78.4667|  India|  IN| IND|  Telangana|  admin| 9746000.0|1356871768|
|     Pune|      Pune|18.5196|73.8553|  India|  IN| IND|Mahārāshtra|   null| 7764000.0|1356081074|
|Ahmedabad

*Usando o comando "like" para filtrar*

In [58]:
#df_spark.filter("admin_name like 'Tai%'").show(5) - linguagem SQL
df_spark.filter(SF.col("admin_name").like("Tai%")).show(5)

+--------+----------+--------+--------+-------+----+----+----------+-------+----------+----------+
|    city|city_ascii|     lat|     lng|country|iso2|iso3|admin_name|capital|population|        id|
+--------+----------+--------+--------+-------+----+----+----------+-------+----------+----------+
|Taichung|  Taichung|   24.15|120.6667| Taiwan|  TW| TWN|  Taichung|  admin| 2803894.0|1158689622|
|  Taipei|    Taipei| 25.0478|121.5319| Taiwan|  TW| TWN|    Taipei|primary| 2684567.0|1158881289|
|  Tainan|    Tainan| 22.9833|120.1833| Taiwan|  TW| TWN|    Tainan|  admin| 1883831.0|1158061376|
| Taitung|   Taitung| 22.7583|121.1444| Taiwan|  TW| TWN|   Taitung|  admin|  108905.0|1158804050|
| Nausori|   Nausori|-18.0244|178.5454|   Fiji|  FJ| FJI|   Tailevu|   null|   47604.0|1242640119|
+--------+----------+--------+--------+-------+----+----+----------+-------+----------+----------+
only showing top 5 rows



In [55]:
df_spark.filter("capital in ('admin', 'primary')").show(5)

+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|   city|city_ascii|    lat|     lng|    country|iso2|iso3| admin_name|capital|population|        id|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
|  Tokyo|     Tokyo|35.6897|139.6922|      Japan|  JP| JPN|      Tōkyō|primary|  3.7977E7|1392685764|
|Jakarta|   Jakarta|-6.2146|106.8451|  Indonesia|  ID| IDN|    Jakarta|primary|   3.454E7|1360771077|
|  Delhi|     Delhi|  28.66|   77.23|      India|  IN| IND|      Delhi|  admin|  2.9617E7|1356872604|
| Mumbai|    Mumbai|18.9667| 72.8333|      India|  IN| IND|Mahārāshtra|  admin|  2.3355E7|1356226629|
| Manila|    Manila|   14.6|120.9833|Philippines|  PH| PHL|     Manila|primary|  2.3088E7|1608618140|
+-------+----------+-------+--------+-----------+----+----+-----------+-------+----------+----------+
only showing top 5 rows



In [56]:
df_spark.filter(SF.col("country").startswith("F")).show(5)

+--------+----------+--------+---------+----------------+----+----+--------------------+-------+----------+----------+
|    city|city_ascii|     lat|      lng|         country|iso2|iso3|          admin_name|capital|population|        id|
+--------+----------+--------+---------+----------------+----+----+--------------------+-------+----------+----------+
|   Paris|     Paris| 48.8566|   2.3522|          France|  FR| FRA|       Île-de-France|primary|   1.102E7|1250015082|
|    Nice|      Nice| 43.7034|   7.2663|          France|  FR| FRA|Provence-Alpes-Cô...|  minor| 1006402.0|1250774553|
|Helsinki|  Helsinki| 60.1756|  24.9342|         Finland|  FI| FIN|             Uusimaa|primary|  642045.0|1246177997|
| Papeete|   Papeete|-17.5334|-149.5667|French Polynesia|  PF| PYF|        Îles du Vent|primary|  131695.0|1258907380|
|    Suva|      Suva|-18.1333| 178.4333|            Fiji|  FJ| FJI|                Rewa|primary|   88271.0|1242615095|
+--------+----------+--------+---------+--------