In [None]:
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType, FloatType, DataType, DateType
from pyspark.sql.functions import *



sourcepath = '/home/doug/ProjetosEstudo/Spark-The-Definitive-Guide/data/'
lrnpath = '/home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2'

spark = SparkSession.\
                    builder.\
                    master('spark://DOUGPC.:7077').\
                    appName('SparkSQLSources').\
                    getOrCreate()
                    
spark

In [None]:
#options do spark essas configurações valem para todas as importações

#irá excluir arquivos corrompidos
spark.sql(" set spark.sql.files.ignoreCorruptFiles= True ") 

#irá ignorar arquivos faltantes, caso seja uma sequencia de arquivos
spark.sql(" set spark.sql.files.ignoreMissingFiles= True ")

#lerá apenas o formato passado 
spark.sql(" set spark.sql.files.pathGlobFilter= ""*.csv"" ") 

#fara lookup dos arquivos de modo ativo, mas ignora particionamento
spark.sql(" set spark.sql.files.recurssiveFileLookup= True") 

##permite criar um timeframe dos arquivos armazenados.
#lerá somente os que se enquadrarem no frame
spark.sql(" set spark.sql.files.modifiedBefore= ""2050-07-01T08:30:00"" and modifiedAfter=""2050-06-01T08:30:00"" ") 

In [None]:
flightschema = StructType([StructField('ORIGIN_COUNTRY_NAME', StringType(), False),
                           StructField('DEST_COUNTRY_NAME', StringType(), False),
                           StructField('count', IntegerType(), False)])

#inferschema irá passar para o spark que ele deve ler e atribuir o datatype nas colunas da fonte
#pode adicionar custo de processamento caso seja true

framereader = spark.read.\
                        format('json').\
                        schema(flightschema).\
                        option('mode', "FAILFAST").\
                        option('inferSchema', False).\
                        load(f'{lrnpath}/flights/summary-data/json/2010-summary.json').\
                        withColumnRenamed('ORIGIN_COUNTRY_NAME', 'origin').\
                        withColumnRenamed('DEST_COUNTRY_NAME', 'destine').\
                        withColumnRenamed('count', 'flyquantity')

framereader.show(5)

In [None]:

#renomear a coluna gera um novo dataframe e ele não sobrescreve o atual
#assim, crie uma nova variável que 'recebera' as colunas renomeadas.
jsonframe = spark.read.\
                        load(format='json',\
                                schema= flightschema,\
                                path= f'{lrnpath}/flights/summary-data/json/2010-summary.json',\
                                mode='failFast',\
                                inferSchema = False)
  
#só acontece se for em um statement separado da leitura do dataframe.                      
jsonrenamed = jsonframe.withColumnRenamed('ORIGIN_COUNTRY_NAME', 'origin').\
                        withColumnRenamed('DEST_COUNTRY_NAME', 'destine').\
                        withColumnRenamed('count', 'flyquantity')
            
jsonrenamed.show(2)


In [None]:
print(lrnpath)

In [None]:
#importando direto do comando SQL.

# readsql = spark.\
#             sql(""" 
#                 SELECT * 
#                 FROM 
#                 csv. `file:///home/doug/ProjetosEstudo/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv`
#                 """)

# readsql.show(2)


sumflyqtd = spark.\
                sql(""" 
                    SELECT sum(_c4) as totaldockcount
                    FROM 
                    csv. 
                    `file:///home/doug/ProjetosEstudo/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv`
                    """)

sumflyqtd.show(2)


     
groupquery = spark.\
                sql(""" 
                    SELECT ORIGIN_COUNTRY_NAME, SUM(count) as total 
                    FROM
                    parquet. 
                    `file:///home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet`
                    GROUP BY ORIGIN_COUNTRY_NAME
                    HAVING ORIGIN_COUNTRY_NAME = 'Russia' or ORIGIN_COUNTRY_NAME = 'United States'
                    """)
groupquery.show(5)


In [None]:
readprq = spark.sql(""" 
                    SELECT * FROM parquet.
                    `file:///home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet` """).\
                filter("count > 10").\
                filter("ORIGIN_COUNTRY_NAME = 'United States' ")
                
readprq.show(0)


readprq = spark.sql(""" 
                    SELECT * 
                    FROM parquet.
                    `file:///home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet` """).\
                where("count > 10").\
                where("ORIGIN_COUNTRY_NAME = 'United States' ").\
                show(0, truncate= True)
    
#criando agregações
#na agregação, primeiro é a coluna, depois a operação.
groupedqtd = spark.sql(""" 
                    SELECT * 
                    FROM parquet.
                    `file:///home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet` """).\
                groupBy('ORIGIN_COUNTRY_NAME').\
                agg({'count': 'sum'}).\
                withColumnRenamed('sum(count)', 'origin_qtd')
                
groupedqtd.show(5)

In [None]:
groupedqtd = spark.sql(""" 
                    SELECT * FROM parquet.
                    `file:///home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/flights/summary-data/parquet/2010-summary.parquet` """).\
                groupBy('ORIGIN_COUNTRY_NAME').\
                agg({'count': 'sum'}).\
                withColumnRenamed('sum(count)', 'origin_qtd')
                
groupedqtd.show(10)

In [None]:
bikeschema = StructType([
                         StructField('station_id', IntegerType(), False),
                         StructField('name', StringType(), False),
                         StructField('lat', FloatType(), False),
                         StructField('long', FloatType(), False),
                         StructField('dockcount', IntegerType(), False),
                         StructField('landmark', StringType(), False),
                         StructField('installation', StringType(), False) 
                         ])
#failfast irá acusar falha caso alguma linha esteja corrompida.
permimport = spark.read.format('csv').\
                         load(f'{sourcepath}/bike-data/201508_station_data.csv',
                              mode='failFast',
                              inferschema = False,
                              schema=bikeschema,
                              header= True)
                                 
permimport.show(5)

In [None]:
#img05.1 dropmalformed irá apagar as linhas que estão corrompidas D

parquetschema = StructType([
                            StructField('DEST_COUNTRY_NAME', StringType(), False),
                            StructField('ORIGIN_COUNTRY_NAME', StringType(), False),
                            StructField('count', IntegerType(), False)
                            ])
parquetopt = spark.\
               read.\
               format('parquet').\
               load(f'{sourcepath}/flight-data/parquet/2010-summary.parquet',
                         mode='dropmalformed',
                         schema = parquetschema)
                        
parquetopt.show()

In [None]:
sequencecsv = spark.read.\
                        option("ignoreCorrupFiles", "true").\
                        csv(f'{sourcepath}/flight-data/csv/',\
                                header= True,
                                modifiedAfter='2011-01-01 23:59:59',     # type: ignore
                                schema= flightschema)


globcsv = spark.read.load(f'{sourcepath}/flight-data/csv/2010-summary.csv',
                                format= 'csv',\
                                recursiveFileLookup= True,\
                                header= True)

timecsv = spark.read.load(f'{sourcepath}/flight-data/csv/',\
                                pathGlobFilter= "*.csv",\
                                format='csv',\
                                header= True,\
                                modifiedBefore= '2011-01-01 23:59:59',
                                schema= flightschema)


                        # option("ignoreCorrputFiles", True).\
                        # option("ignoreMissingFiles", True).\
                        # option("pathGlobFilter", "*.csv").\
                        # option("mode", 'failFast').\
                        # option('modifiedBefore', '2011-01-01 23:59:59').\
                        # csv(f'{sourcepath}/flight-data/csv/*.csv',\
                        #         header= True)

# sequencecsv.show(5)
print(sequencecsv.count())
print(globcsv.count())
print(timecsv.count())

In [None]:
#abaixo, tratarei de parquet 
#todos os campos quando importado de fonte parquet passam a aceitar nulos. São "nullable true"

parqueframe = spark.\
                    read.\
                    parquet(f'{lrnpath}/sf-airbnb/sf-airbnb-clean-100p.parquet')

parqueframe.schema

parquetpath = f'{lrnpath}sf-airbnb/sf-airbnb-clean-100p.parquet'

parqueframe.write.mode('overwrite').saveAsTable('tableparquet', mode= 'append')

In [None]:
parqueframe.createOrReplaceTempView('airbnbview')
#o problema é ter que passar o path do arquivo completo

spark.sql("""
          
          CREATE OR REPLACE TEMPORARY VIEW PRQ_VIEW
            USING parquet
            OPTIONS (
                PATH '/home/doug/ProjetosEstudo/LearningSparkV2/databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean-100p.parquet'
            )
          
          """)

spark.sql(""" select * from prq_view""").show()

In [None]:
parquetfr = spark.read.csv(f'{sourcepath}flight-data/csv/2010-summary.csv', inferSchema= True, header=True)

flightfr = ((spark.
                read.
                format('csv').
                load(f'{sourcepath}bike-data', inferSchema= True, header=True).
                withColumnRenamed('Trip ID', 'trip_id').
                withColumnRenamed('Duation', 'duration').
                withColumnRenamed('Start Date', 'start_date').
                withColumnRenamed('Start Station', 'start_station').
                withColumnRenamed('Start Terminal', 'start_terminal').
                withColumnRenamed('End Date', 'end_date').
                withColumnRenamed('End Station', 'end_station').
                withColumnRenamed('End Terminal', 'end_terminal').
                withColumnRenamed('Bike #', 'bike_identity').
                withColumnRenamed('Subscriber Type', 'sub_type').
                withColumnRenamed('Zip Code', 'zip_code')
                
            ))

#mergeschema
#escrever os dois dataframes no mesmo diretório e fazer um mergeschema

parquetfr.show(5)
flightfr.show(5)

In [None]:
flightfr.write.parquet('/tmp/prq/df=1', mode='overwrite')
parquetfr.write.parquet('/tmp/prq/df=2', mode= 'overwrite')

In [None]:
mergeparquet = ((
                spark.
                    read.
                    option('mergeSchema', True).
                    parquet('/tmp/prq')
                ))

#print merged parquet schema
mergeparquet.printSchema()

#write new dataframe to directory
mergeparquet.write.parquet('/tmp/mergeprq/', mode = 'overwrite')

#create new table with merged schema
mergeparquet.write.saveAsTable('bike_trip_merged')

In [None]:
#querying merged parquet table
spark.sql( """  
                select 
                    trip_id,
                    start_date,
                    end_date,
                    dest_country_name as destine,
                    origin_country_name as origin,
                    df
                from bike_trip_merged
                where df = 2
                
        """).fillna('Not Av').show()

In [76]:
flightfr.createOrReplaceTempView('flightview_parquet')

In [77]:
#parquet is a default format 

spark.sql(""" 
          select * from flightview_parquet
          """
          ).write.\
            mode('overwrite').\
            save('/tmp/flight')
            

23/08/19 14:58:46 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 7, schema size: 11
CSV file: file:///home/doug/ProjetosEstudo/Spark-The-Definitive-Guide/data/bike-data/201508_station_data.csv
                                                                                