In [32]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession


In [33]:
from pyspark.sql.types import *
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [34]:
# Criar uma sessão Spark
spark = SparkSession.builder \
    .appName("Lendo CSV com PySpark") \
    .getOrCreate()

# Carregar o arquivo CSV em um DataFrame
df = spark.read.csv('dados/despachantes.csv', sep=",", header=False, inferSchema=True,schema=arqschema)

In [35]:
df.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [28]:
#comparando os schemas
df.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

In [29]:
from pyspark.sql import functions as Func

In [36]:
df.select("id","nome","vendas").where(Func.col("vendas") > 20).show()
#& para and, | para or, e ~ para not
df.select("id","nome","vendas").where((Func.col("vendas") > 20) & (Func.col("vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [38]:
#renomear coluna
novodf = df.withColumnRenamed("nome","nomes")
novodf.columns


['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [40]:
from pyspark.sql.functions import *
#coluna data está como string, vamos transformar em texto
despachantes2 = df.withColumn("data2", to_timestamp(Func.col("data"),"yyyy-MM-dd"))
despachantes2.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True), StructField('data2', TimestampType(), True)])

In [41]:
#operações sobre datas
despachantes2.select(year("data")).show()
despachantes2.select(year("data")).distinct().show()
despachantes2.select("nome",year("data")).orderBy("nome").show()
despachantes2.select("data").groupBy(year("data")).count().show()
despachantes2.select(Func.sum("vendas")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



                                                                                

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



In [42]:
#salvar, são diretórios
df.write.format("parquet").save("dados/dfimportparquet")
df.write.format("csv").save("dados/dfimportcsv")
df.write.format("json").save("dados/dfimportjson")
df.write.format("orc").save("dados/dfimportorc")


                                                                                

In [47]:
js = spark.read.format("json").load("dados/dfimportjson/part-00000-4ecbc087-cd45-4502-a1a1-bec25aa30c2b-c000.json")
js.show()

+-------------+----------+---+-------------------+------+------+
|       cidade|      data| id|               nome|status|vendas|
+-------------+----------+---+-------------------+------+------+
|  Santa Maria|2020-08-11|  1|   Carminda Pestana| Ativo|    23|
|Novo Hamburgo|2020-03-05|  2|    Deolinda Vilela| Ativo|    34|
| Porto Alegre|2020-02-05|  3|   Emídio Dornelles| Ativo|    34|
| Porto Alegre|2020-02-05|  4|Felisbela Dornelles| Ativo|    36|
| Porto Alegre|2020-02-05|  5|     Graça Ornellas| Ativo|    12|
| Porto Alegre|2019-01-05|  6|   Matilde Rebouças| Ativo|    22|
|  Santa Maria|2019-10-05|  7|    Noêmia   Orriça| Ativo|    45|
| Porto Alegre|2020-03-05|  8|      Roque Vásquez| Ativo|    65|
| Porto Alegre|2018-05-05|  9|      Uriel Queiroz| Ativo|    54|
| Porto Alegre|2020-09-05| 10|   Viviana Sequeira| Ativo|     0|
+-------------+----------+---+-------------------+------+------+



In [50]:
cs = spark.read.format("csv").load("dados/despachantes.csv")
cs.show()

arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

cs2 = spark.read.format("csv").load("dados/despachantes.csv", schema=arqschema)

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+

