In [1]:
!pip install --upgrade pyspark
!pip install --upgrade setuptools 



In [5]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f 

In [None]:
spark = SparkSession.builder.appName('spark').getOrCreate()

In [None]:
df = spark.read.csv('arquivo.csv', 
                    header=True, # existe cabeçalho no arquivo #
           inferSchema=True # spark, defina as colunas #
                        )

In [None]:
df.printSchema()

In [None]:
df.show(2, vertical=True) # gera um df na vertical e não na horizontal

In [None]:
df = df.select(
                'coluna1', 'coluna2'
                ) # selecionar uma visão com apenas x colunas #

In [None]:
df = df.filter(f.col('colunaX') > 1) # selecionar os valores que sejam maior que 1 presentes na colunaX # 
df = df.filter(f.col('colunaX') >= 1) # 

df = df.filter(f.col('colunaX') == 'oi') #
df = df.filter(f.col('colunaX') == 1) #

df = df.filter(f.col('colunaX') < 1) #
df = df.filter(f.col('colunaX') <= 1) #

df = df.filter(f.col('colunaX') != 1) #

df = df.filter(f.col('colunaX') == 1) #

df = df.filter((f.col('colunaX') == 1) & (f.col('colunaY') != 'ab')) #
df = df.filter((f.col('colunaX') == 1) | (f.col('colunaY') != 'ab')) #


In [None]:
df.count() # contar linhas do df #

df.distinct().count() # contar linhas unicas do df # 

df.select('coluna').distinct().count() # contar linhas unicas de x colunas # 

df = df.distinct() # selecionar apenas valores unicos # 

In [None]:
df_sql = df.createOrReplaceTempView('tabela') # criar uma tabela temporaria a partir de um df # 

consulta = spark.sql("""
                        SELECT * FROM database.tabela
                        """)

In [None]:
# Gravar DataFrame em formato CSV #
df.write.csv("saida.csv", header=True)

In [None]:
x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        f.count(('coluna_C').alias('nome_contagem'))
                    )

x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        f.countDistinct(('coluna_C').alias('nome'))
                    )

x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        f.sum(('coluna_C').alias('nome'))
                    )

x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        f.avg(('coluna_C').alias('nome'))
                    )

x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        f.min(('coluna_C').alias('nome')), 
                        f.max(('coluna_D').alias('nome'))
                    )

x = df.groupBy(
                'coluna'
                ).agg(
                        f.collect_list('outra_coluna').alias('lista_distinta') 
                        # coletar uma lista de valores distintos de uma coluna # 
                    )

x = df.groupBy(
                'coluna_A', 'coluna_B'
                ).agg(
                        (f.col('coluna_C') - f.col('coluna_D')).alias('nome')
                    )


In [None]:
df = df.withColumn('nova_coluna', f.col('coluna_a') + f.col('coluna_b')) # somar os valores de duas colunas #

df = df.withColumnRenamed('nome_alterado', 'nome_original') # alterar o nome da coluna #

df = df.withColumn('nova_coluna', f.concat(f.col('coluna_a'), f.lit(' '), f.col('coluna_b')))

df = df.withColumn('nova_coluna', f.concat(f.lit(2), f.col('coluna_a'))) # criar valor e adicionar a coluna #

df = df.withcolumn('nova_coluna', f.lpad(f.col('coluna_a'), 2, '0')) # inserir um valor a esquerda # 

df = df.withcolumn('nova_coluna', f.rpad(f.col('coluna_a'), 2, '0')) # inserir um valor a direita # 

In [None]:
df_join = df.join(
    df_2, 
    on = ['chave_id'],
    how = 'inner' # encontrar apenas os valores em comuns dos dois dataframes #
)

df_join = df.join(
    df_2, 
    on = ['chave_id'],
    how = 'full' # adicionar tudo dos dois dataframes #
)

df_join = df.join(
    df_2, 
    on = ['chave_id'],
    how = 'left' # adicionar as informações do dataframe a direita ao da esquerda #
)

# a diferença entre o join e o union é que o union precisa de dataframes iguais e 
# o join pode ser com dataframes diferentes com um identificador em comum #

df_union = df.union(df_2)

In [None]:
df = df.withColumn(
                    'nova coluna', f.when(f.col('coluna_x') > 10, 'maior que 10'
                    ).otherwise('menor que 10')
                    )

df = df.orderBy(f.col('coluna').desc())

In [None]:
df = df.filter(f.col('coluna').isNull()) # filtar apenas os valores nulos de uma coluna #

df = df.filter(f.col('coluna').isNotNull()) # filtrar apenas os valores não nulos de uma coluna #

df = df.fillna({'coluna': 'valor_padrao'}) # preencher valores nulos em uma coluna # 

df_not_null = df.dropna(subset=['coluna1', 'coluna2']) # dropar linhas com valores nulos # 

In [None]:
# manipulação de datas # 
import datetime as dt 

# criando uma data passada # 
dt_hoje = dt.datetime.now()
df = df.withColumn('data_x', f.lit(dt_hoje - dt.timedelta(days=60)))

# separando a data # 
df = df.withColumn('data_date', f.col('data_string_ou_outro').cast('date'))

df = df.withColumn("ano", f.year("data_date"))
df = df.withColumn("mes", f.month("data_date"))
df = df.withColumn("dia", f.day("data_date"))
df = df.withColumn("semana_ano", f.weekofyear("data_date"))
df = df.withColumn("dia_semana", f.dayofweek("data_date"))

# a diferença entre duas datas # 
df = df.withColumn('diferenca_dias', f.datediff(df_1['data_a'], df_2['data_b']))

In [None]:
# converter dados # 
df = df.withColum('coluna_alterada', f.col('coluna').cast('int')) # inteiro # 

df = df.withColum('coluna_alterada', f.col('coluna').cast('double')) # float # 

df = df.withColum('coluna_alterada', f.col('coluna').cast('string')) # 'texto' # 

df = df.withColumn('coluna_alterada', f.col('coluna').cast('date')) # 'yyyy-MM-dd' #

In [None]:
# exemplo de for # 

lista_colunas = ['coluna_1', 'coluna_2']

mediana = df_clientes_reduzido.groupBy(
    "fl_churn",
    "fl_plano_internacional",
    "fl_plano_correio_voz"
    ).agg(
        *[f.expr(f"percentile_approx({col}, 0.5)").alias(f'mediana_{col}') for col in lista_colunas] # 
        )