In [None]:
import os
import findspark

os.environ["SPARK_HOME"] = "D:/spark-3.5.5-bin-hadoop3"
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()


import pandas as pd

In [None]:
PANDAS = pd.DataFrame([])
SPARK = spark.createDataFrame([])

In [None]:
PANDAS = pd.read_csv('TESTE.csv', sep=';')
SPARK = spark.read.csv('TESTE.csv', sep=';', inferSchema=True)
PANDAS_2 = pd.read_csv('TESTE_2.csv', sep=';')
SPARK_2 = spark.read.csv('TESTE_2.csv', sep=';', inferSchema=True)

OBS: os Displays abaixo são apenas para mostrar todos os prints no mesmo output

In [None]:
# O PRINT DO DATAFRAME DO SPARK É TOTALMENTE DIFERENTE, PORÉM ELE POSSUI UMA FUNÇÃO ESPECIFICA PARA FICAR COMO O DF DO PANDAS:
display(PANDAS)
SPARK.toPandas()

In [None]:
display(PANDAS.head(5))
SPARK.limit(5).toPandas()

In [None]:
PANDAS.rename(columns={'TESTE': 'COLUNA_1'}, inplace=True)
SPARK.withColumRenamed('TESTE', 'COLUNA_1')

In [None]:
print(PANDAS.info)
SPARK.printSchema()

In [None]:
from pyspark.sql.types import DoubleType, StringType
from pyspark.sql import functions as f

PANDAS['COLUNA_2'] = PANDAS['COLUNA_2'].str.replace(',', '.')
SPARK = SPARK.withColumn('COLUNA_2', f.regexp_replace('COLUNA_2', ',', '.'))

In [None]:
PANDAS['COLUNA_2'] = PANDAS['COLUNA_2'].astype(float)
SPARK = SPARK.withColumn('COLUNA_2', SPARK['COLUNA_2'].cast(DoubleType()))

In [None]:
# ASSUMINDO QUE O CAMPO DE DATA ESTA COMO INT/LONG, ETC EX 20200924 -> 24/09/2020
PANDAS['COLUNA_3'] = pd.to_datetime(PANDAS['COLUNA_3'], dayfirst=True, errors='coerce')
SPARK = SPARK.withColumn('COLUNA_3', f.to_date(SPARK.COLUNA_3.cast(StringType()), 'yyyMMdd'))#.withColum ...

In [None]:
# SEMELHANTE AO SELECT SQL
PANDAS[['COLUNA_1, COLUNA_2, COLUNA_3']]
SPARK.select('*').show(5, truncate=False)

In [None]:
# O DADO DEVE ESTAR EM FORMATO DATE
PANDAS['COLUNA_4'] = PANDAS['COLUNA_3'].dt.year
SPARK.select(f.year('COLUNA_3').alias('COLUNA_4'))

In [None]:
PANDAS[(pd.isnull(PANDAS['COLUNA_3'])) | (pd.isnull(PANDAS['COLUNA_4']))]
SPARK.select(f.when(f.isnull('COLUNA_3')))

In [None]:
# ELE LEVA EM CONSIDERAÇÃO A TIPAGEM 
PANDAS.fillna(0)
SPARK.na.fill(0)

# PARA NONE:
PANDAS.fillna('-')
SPARK.na.fill('-')

In [None]:
PANDAS.sort_values(by='COLUNA_4', ascending=False)
SPARK.select('*').orderBy('COLUNA_4', ascending=False)

PANDAS.sort_values(by=['COLUNA_4', 'COLUNA_3'], ascending=[False, False])
SPARK.select('*').orderBy(['COLUNA_4', 'COLUNA_3'], ascending=[False, False])

In [None]:
#WHERE E FILTER SÃO EQUIVALENTES NO SPARK
PANDAS[PANDAS['COLUNA_3'] == 50]
SPARK.where('COLUNA_3 == 50')

PANDAS[(PANDAS['COLUNA_3'] == 50) | (PANDAS['COLUNA_3'] == 40)]
SPARK.where('COLUNA_3 == 50').where('COLUNA_3 == 40')

In [None]:
PANDAS[(PANDAS["COLUNA_1"].str.startswith("GABRIEL")) & (PANDAS["COLUNA_1"].str.endswith("TALIETTA"))]
SPARK.filter(COLUNA_1.startwith('GABRIEL')).filter(COLUNA_1.endswith('TALIETTA'))

In [None]:
PANDAS[PANDAS['COLUNA_1'].str.contains('TESTE', case=False)]
SPARK.where(f.upper(SPARK['COLUNA_1']).like('%TESTE%'))

In [None]:
PANDAS[PANDAS['COLUNA_4'] >= 2010]['COLUNA_3'].value_counts().sort_values(by='COLUNA_4')
SPARK.select('COLUNA_3').where('COLUNA_4 >= 2010').groupBy('COLUNA_4').count().orderBy('COLUNA_4')

In [None]:
PANDAS.groupby('COLUNA_5').agg({'COLUNA_6': ['mean'], 
                                'COLUNA_7': ['count']})
SPARK.select('*').groupBy('COLUNA_5').agg(f.avg('COLUNA_5').alias('COLUNA_6'),
                                          f.count('COLUNA_5').alias('COLUNA_7'))

In [None]:
PANDAS.describe()
SPARK.summary().show()

In [None]:
# OBS: NORMALMENTE UTILIZAMOS LEFT, POREM EXISTEM OUTRAS FORMAS DE MERGE/JOIN: 'inner', 'right', 'outer', ENTRE OUTROS
PANDAS.merge(PANDAS_2, on='COLUNA_1', how='left')
SPARK.join(SPARK_2, 'COLUNA_1', how='left')

In [None]:
pd.concat([PANDAS, PANDAS_2], ignore_index=True)
SPARK.union(SPARK_2)

In [None]:
from pandassql import sqldf
sqldf("SELECT * FROM PANDAS", locals())

SPARK.CreateOrReplaceTempView("SPARK_TEMP")
spark.sql("SELECT * FROM SPARK_VIEW")

In [None]:
# A FORMA ABAIXO DE SPARK AUTOMATICAMENTE PARTICIONA O ARQUIVO
# obs: não é possivel exportar os arquivos para a pasta local com spark, 
#      como fazemos no pandas, então, no minimo temos que criar uma pasta dentro da pasta local
PANDAS.to_csv('PANDAS_TO_SPARK.csv', sep=';' index=False)
SPARK.writer.csv(path='./PANDAS_TO_SPARK_CSV', mode='overwrite', sep=';', header=True)

PANDAS.to_parquet('PANDAS_TO_SPARK.parquet', index=False)
SPARK.writer.parquet(path='./PANDAS_TO_SPARK_PARQUET'mode='overwrite')

# PARA CRIAR UM ARQUIVO ÚNICO COM SPARK
SPARK.coalesce(1).write.csv(path='PANDAS_TO_SPARK_N_PARTICIONADO', mode='overwrite', sep=';', header=True)

In [None]:
# AO FINALIZAR UM NOTEBOOK É NECESSÁRIO FECHAR A SEÇÃO SPARK COM:
spark.stop()