In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()
import datetime

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType

In [4]:
def CriaVw(df):
    return df.createOrReplaceTempView("Data")

def Consolida_SQL(_col):
    return spark.getOrCreate().sql(f"Select {_col}, count(*) from Data Group By {_col} order by 1").show()
    
def Consolida(_col, df):
    CriaVw(df)
    return Consolida_SQL(_col)

In [5]:
# Criar o contexto do spark
sc = SparkContext()

# Instancia o criador de sessao do spark
spark = (SparkSession.builder
                     .master("local[7]")
                     .appName("Semana 3 - Desafio Transformação"))

In [6]:
schema_airports = StructType([
    StructField("faa",  StringType(),  True),
    StructField("name", StringType(),  True),
    StructField("lat",  FloatType(),   True),
    StructField("lon",  FloatType(),   True),
    StructField("alt",  IntegerType(), True),
    StructField("tz",   IntegerType(), True),
    StructField("dst",  StringType(),  True)
])

schema_planes = StructType([
    StructField("tailnum",      StringType(),  True),
    StructField("year",         IntegerType(), True),
    StructField("type",         StringType(),  True),
    StructField("manufacturer", StringType(),  True),
    StructField("model",        StringType(),  True),
    StructField("engines",      IntegerType(), True),
    StructField("seats",        IntegerType(), True),
    StructField("speed",        IntegerType(), True),
    StructField("engine",       StringType(),  True)
])

schema_flights = StructType([
    StructField("year",      IntegerType(), True),
    StructField("month",     IntegerType(), True),
    StructField("day",       IntegerType(), True),
    StructField("dep_time",  StringType(),  True),
    StructField("dep_delay", IntegerType(), True),
    StructField("arr_time",  StringType(),  True),
    StructField("arr_delay", IntegerType(), True),
    StructField("carrier",   StringType(),  True),
    StructField("tailnum",   StringType(),  True),
    StructField("flight",    StringType(),  True),
    StructField("origin",    StringType(),  True),
    StructField("dest",      StringType(),  True),
    StructField("air_time",  IntegerType(), True),
    StructField("distance",  IntegerType(), True),
    StructField("hour",      IntegerType(), True),
    StructField("minute",    IntegerType(), True),
])

In [7]:
df_airports = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_airports)
                  .load("C:/Users/amarti40/OneDrive - Capgemini/Desktop/ACELERAÇÃO PYSPARK/Data/airports.csv"))

df_planes = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_planes)
                  .load("C:/Users/amarti40/OneDrive - Capgemini/Desktop/ACELERAÇÃO PYSPARK/Data/planes.csv"))

df_flights = (spark.getOrCreate().read
                  .format("csv")
                  .option("header", "true")
                  .schema(schema_flights)
                  .load("C:/Users/amarti40/OneDrive - Capgemini/Desktop/ACELERAÇÃO PYSPARK/Data/flights.csv"))

df_airports.show(5)
df_planes.show(5)
df_flights.show(5)

+---+--------------------+---------+---------+----+---+---+
|faa|                name|      lat|      lon| alt| tz|dst|
+---+--------------------+---------+---------+----+---+---+
|04G|   Lansdowne Airport|41.130474|-80.61958|1044| -5|  A|
|06A|Moton Field Munic...| 32.46057|-85.68003| 264| -5|  A|
|06C| Schaumburg Regional| 41.98934|-88.10124| 801| -6|  A|
|06N|     Randall Airport| 41.43191|-74.39156| 523| -5|  A|
|09J|Jekyll Island Air...|31.074472|-81.42778|  11| -4|  A|
+---+--------------------+---------+---------+----+---+---+
only showing top 5 rows

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182| null|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      

In [8]:
# Criacao das visões temporarias
df_airports.createOrReplaceTempView('airports')
df_planes.createOrReplaceTempView('planes')
df_flights.createOrReplaceTempView('flights')

# Planes


## Pergunta 1

In [9]:
df_planes_Final = df_planes.withColumn('tailchar',  (
                                                    F.when(F.col('tailnum').startswith("N"), F.regexp_replace(F.substring('tailnum',2,6), r'[0-9]{1,}', ''))
                                                    .when(~F.col('tailnum').startswith("N"), F.regexp_replace('tailnum', r'[0-9]{1,}', ''))                                                
                                                    .otherwise(F.col('tailnum'))
                                                    )
                                          )
df_planes_Final.groupBy(F.col("tailnum") ,F.col("tailchar")).count().distinct().orderBy(F.col("tailchar").desc()).show()

+-------+--------+-----+
|tailnum|tailchar|count|
+-------+--------+-----+
| N711ZX|      ZX|    1|
| N6704Z|       Z|    1|
| N3733Z|       Z|    1|
| N270YV|      YV|    1|
| N3758Y|       Y|    1|
| N6713Y|       Y|    1|
| N6705Y|       Y|    1|
| N3748Y|       Y|    1|
| N3762Y|       Y|    1|
|  N704X|       X|    1|
| N8322X|       X|    1|
| N216WR|      WR|    1|
| N500WR|      WR|    1|
| N909WN|      WN|    1|
| N399WN|      WN|    1|
| N904WN|      WN|    1|
| N447WN|      WN|    1|
| N918WN|      WN|    1|
| N967WN|      WN|    1|
| N441WN|      WN|    1|
+-------+--------+-----+
only showing top 20 rows



## Pergunta 2

In [10]:
df_planes_Final = df_planes_Final.withColumn('year',  (
                                                            F.when((F.col('year') == 0), 1996)
                                                            .otherwise(F.col('year'))
                                                         )
                                                )

df_planes.groupBy("year").count().distinct().orderBy("year").show()
df_planes_Final.groupBy("year").count().distinct().orderBy("year").show()

+----+-----+
|year|count|
+----+-----+
|null|   60|
|   0|    1|
|1959|    1|
|1963|    1|
|1968|    1|
|1975|    2|
|1976|    1|
|1980|    1|
|1984|    5|
|1985|   15|
|1986|   13|
|1987|   23|
|1988|   29|
|1989|   21|
|1990|   42|
|1991|   48|
|1992|   78|
|1993|   41|
|1994|   39|
|1995|   54|
+----+-----+
only showing top 20 rows

+----+-----+
|year|count|
+----+-----+
|null|   60|
|1959|    1|
|1963|    1|
|1968|    1|
|1975|    2|
|1976|    1|
|1980|    1|
|1984|    5|
|1985|   15|
|1986|   13|
|1987|   23|
|1988|   29|
|1989|   21|
|1990|   42|
|1991|   48|
|1992|   78|
|1993|   41|
|1994|   39|
|1995|   54|
|1996|   73|
+----+-----+
only showing top 20 rows



## Pergunta 3

In [11]:
w2 = Window.partitionBy("manufacturer", "model").orderBy(F.col("manufacturer"), F.col("model"), F.col("year")) 

df_planes_suport1 = df_planes_Final.filter(F.col("year").isNotNull())

df_planes_suport1 = (df_planes_suport1.withColumn("row",F.row_number().over(w2))
                    .filter((F.col("row") == 1))
                    .drop("row", "tailnum", "type", "engines", "seats", "speed", "engine", "tailchar")
                    .withColumnRenamed("year","year1")
                    .withColumnRenamed("manufacturer","manufacturer1")
                    .withColumnRenamed("model","model1")
                    )

w3 = Window.partitionBy(F.col("manufacturer")).orderBy(F.col("manufacturer"), F.col("year")) 

df_planes_suport2 = df_planes_Final.filter(F.col("year").isNotNull())

df_planes_suport2 = (df_planes_suport2.withColumn("row",F.row_number().over(w3))
                    .filter((F.col("row") == 1))
                    .drop("row", "tailnum", "type", "model", "engines", "seats", "speed", "engine", "tailchar")
                    .withColumnRenamed("year","year2")
                    .withColumnRenamed("manufacturer","manufacturer2")
                    )

df_planes_Final = (df_planes_Final.join(df_planes_suport1, 
                                        (df_planes_Final.manufacturer ==  df_planes_suport1.manufacturer1) 
                                        & (df_planes_Final.model ==  df_planes_suport1.model1)
                                        ,"left")
                  )

df_planes_Final = (df_planes_Final.withColumn('year',  (
                                                        F.when((F.col("year").isNull()) & (F.col("year1").isNotNull()) , F.col("year1"))
                                                        .otherwise (F.col("year"))
                                                        )
                                              ).drop("manufacturer1", "model1", "year1")
                  )

df_planes_Final = (df_planes_Final.join(df_planes_suport2, 
                                        df_planes_Final.manufacturer ==  df_planes_suport2.manufacturer2 
                                        ,"left")
                  )

df_planes_Final = (df_planes_Final.withColumn('year',  (
                                                        F.when((F.col("year").isNull()) & (F.col("year2").isNotNull()) , F.col("year2"))
                                                        .otherwise (F.col("year"))
                                                        )
                                              ).drop("manufacturer2", "year2")
                  )

df_planes.groupBy("year").count().distinct().orderBy("year").show()
df_planes_Final.groupBy("year").count().distinct().orderBy("year").show()

+----+-----+
|year|count|
+----+-----+
|null|   60|
|   0|    1|
|1959|    1|
|1963|    1|
|1968|    1|
|1975|    2|
|1976|    1|
|1980|    1|
|1984|    5|
|1985|   15|
|1986|   13|
|1987|   23|
|1988|   29|
|1989|   21|
|1990|   42|
|1991|   48|
|1992|   78|
|1993|   41|
|1994|   39|
|1995|   54|
+----+-----+
only showing top 20 rows

+----+-----+
|year|count|
+----+-----+
|null|    2|
|1959|    1|
|1963|    1|
|1968|    1|
|1975|    2|
|1976|    1|
|1980|    1|
|1984|    6|
|1985|   15|
|1986|   13|
|1987|   23|
|1988|   29|
|1989|   21|
|1990|   42|
|1991|   48|
|1992|   78|
|1993|   42|
|1994|   45|
|1995|   55|
|1996|   73|
+----+-----+
only showing top 20 rows



## Pergunta 4

In [12]:
date = datetime.date.today()

df_planes_Final = df_planes_Final.withColumn('age',  (
                                                             F.when(F.col('year').isNotNull(), date.strftime("%Y")-F.col('year'))
                                                             .otherwise('NaN')
                                                         )
                                                )

df_planes_Final.groupBy("age").count().distinct().orderBy("age").show()

+----+-----+
| age|count|
+----+-----+
|10.0|   90|
|11.0|   64|
|12.0|   46|
|13.0|   77|
|14.0|  110|
|15.0|   93|
|16.0|  128|
|17.0|  112|
|18.0|  132|
|19.0|  118|
|20.0|  139|
|21.0|  215|
|22.0|  189|
|23.0|  174|
|24.0|  169|
|25.0|   80|
|26.0|   73|
|27.0|   55|
|28.0|   45|
|29.0|   42|
+----+-----+
only showing top 20 rows



## Pergunta 5

In [13]:
df_planes_Final = df_planes_Final.withColumn('type',  (
                                                             F.when(F.col('type') == "Fixed wing multi engine", "MULTI_ENG")
                                                             .when(F.col('type') == "Fixed wing single engine", "SINGLE_ENG")
                                                             .when(F.col('type') == "Rotorcraft", "ROTORCRAFT")
                                                             .otherwise(None)
                                                         )
                                                )

df_planes_Final.groupBy("type").count().distinct().orderBy("type").show()

+----------+-----+
|      type|count|
+----------+-----+
| MULTI_ENG| 2615|
|ROTORCRAFT|    3|
|SINGLE_ENG|   10|
+----------+-----+



## Pergunta 6

In [14]:
df_planes_Final = df_planes_Final.withColumn('manufacturer', (
                                                                F.when((F.col('manufacturer').contains("AIRBUS")) , "AIRBUS")
                                                                .when((F.col('manufacturer').contains("BOEING")) , "BOEING")
                                                                .when((F.col('manufacturer').contains("BOMBARDIER")) , "BOMBARDIER")
                                                                .when((F.col('manufacturer').contains("CESSNA")) , "CESSNA")
                                                                .when((F.col('manufacturer').contains("EMBRAER")) , "EMBRAER")
                                                                .when((F.col('manufacturer').contains("SIKORSKY")) , "SIKORSKY")
                                                                .when((F.col('manufacturer').contains("CANADAIR")) , "CANADAIR")
                                                                .when((F.col('manufacturer').contains("PIPER")) , "PIPER")
                                                                .when((F.col('manufacturer').contains("MCDONNELL DOUGLAS")) , "MCDONNELL DOUGLAS")
                                                                .when((F.col('manufacturer').contains("CIRRUS")) , "CIRRUS")
                                                                .when((F.col('manufacturer').contains("BELL")) , "BELL")
                                                                .when((F.col('manufacturer').contains("KILDALL GARY")) , "KILDALL GARY")
                                                                .when((F.col('manufacturer').contains("LAMBERT RICHARD")) , "LAMBERT RICHARD")
                                                                .when((F.col('manufacturer').contains("BARKER JACK")) , "BARKER JACK")
                                                                .when((F.col('manufacturer').contains("ROBINSON HELICOPTER")) , "ROBINSON HELICOPTER")
                                                                .when((F.col('manufacturer').contains("GULFSTREAM")) , "GULFSTREAM")
                                                                .when((F.col('manufacturer').contains("MARZ BARRY")) , "MARZ BARRY")
                                                                .otherwise(F.col('manufacturer'))
                                                             )
                                            )

df_planes_Final.groupBy("manufacturer").count().distinct().orderBy("manufacturer").show()

+-------------------+-----+
|       manufacturer|count|
+-------------------+-----+
|             AIRBUS|  798|
|        BARKER JACK|    1|
|               BELL|    1|
|             BOEING| 1460|
|         BOMBARDIER|  214|
|           CANADAIR|    8|
|             CESSNA|    4|
|             CIRRUS|    1|
|            EMBRAER|   37|
|         GULFSTREAM|    1|
|       KILDALL GARY|    1|
|    LAMBERT RICHARD|    1|
|         MARZ BARRY|    1|
|  MCDONNELL DOUGLAS|   96|
|              PIPER|    2|
|ROBINSON HELICOPTER|    1|
|           SIKORSKY|    1|
+-------------------+-----+



## Pergunta 7

In [15]:
df_planes_Final = df_planes_Final.withColumn('model', (
                                                        F.trim(F.regexp_replace(F.col('model'), r'\(([^)]+)\)', ""))
                                                        )
                                            )

df_planes_Final.groupBy("model").count().distinct().orderBy("model").show()

+-------+-----+
|  model|count|
+-------+-----+
|    150|    1|
|   172M|    1|
|   206B|    1|
|  210-5|    1|
|   421C|    1|
|737-301|    2|
|737-3A4|    1|
|737-3G7|    2|
|737-3H4|  104|
|737-3K2|    2|
|737-3L9|    2|
|737-3Q8|    4|
|737-3T5|    1|
|737-3TO|    2|
|737-3Y0|    1|
|737-490|   16|
|737-4Q8|    9|
|737-4S3|    1|
|737-5H4|   10|
|737-705|    2|
+-------+-----+
only showing top 20 rows



## Pergunta 8

In [16]:
df_planes_Final = df_planes_Final.withColumn('speed', (
                                                                F.when((F.col('speed').isNull()) & (F.col('seats').isNotNull()) , F.ceil((F.col('seats')/0.36)))
                                                                .otherwise(F.col('speed'))
                                                             )
                                            )

df_planes_Final.groupBy("speed").count().distinct().orderBy("speed").show()

+-----+-----+
|speed|count|
+-----+-----+
|    6|    4|
|   12|    1|
|   14|    1|
|   17|    1|
|   39|    1|
|   56|    1|
|   89|   37|
|   90|    2|
|  107|    1|
|  108|    1|
|  112|    1|
|  126|    1|
|  153|   92|
|  223|   98|
|  264|   32|
|  278|   21|
|  356|    1|
|  389|  441|
|  395|   67|
|  403|   53|
+-----+-----+
only showing top 20 rows



## Pergunta 9

In [17]:
df_planes_Final = df_planes_Final.withColumn('engine_type', (
                                                                F.when((F.col('engine') == 'Turbo-fan') , "FAN")
                                                                .when((F.col('engine') == 'Turbo-jet') , "JET")
                                                                .when((F.col('engine') == 'Turbo-prop') , "PROP")
                                                                .when((F.col('engine') == 'Turbo-shaft') , "SHAFT")
                                                                .when((F.col('engine') == '4 Cycle') , "CYCLE")    
                                                                .otherwise(None)
                                                             )
                                            )

df_planes_Final.groupBy("engine_type").count().distinct().orderBy("engine_type").show()
df_planes_Final.groupBy("engine", "engine_type").count().distinct().orderBy("engine", "engine_type").show()

+-----------+-----+
|engine_type|count|
+-----------+-----+
|       null|   10|
|      CYCLE|    1|
|        FAN| 2127|
|        JET|  450|
|       PROP|   37|
|      SHAFT|    3|
+-----------+-----+

+-------------+-----------+-----+
|       engine|engine_type|count|
+-------------+-----------+-----+
|      4 Cycle|      CYCLE|    1|
|Reciprocating|       null|   10|
|    Turbo-fan|        FAN| 2127|
|    Turbo-jet|        JET|  450|
|   Turbo-prop|       PROP|   37|
|  Turbo-shaft|      SHAFT|    3|
+-------------+-----------+-----+



In [18]:
df_planes_Final.write.parquet("C:/Users/amarti40/OneDrive - Capgemini/Desktop/ACELERAÇÃO PYSPARK/Semana 3 - Desafio Transformação/Outputs_Planes_Transformacao.parquet")