## IMPORT CONTEXT

In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext.getOrCreate(sc)

In [3]:
from pyspark.sql.types import *

In [4]:
from pyspark.sql import HiveContext
hc = HiveContext.getOrCreate(sc)

In [5]:
# Drop tables if need
sqlContext.sql("drop table cities")
sqlContext.sql("drop table lines")
sqlContext.sql("drop table station_lines")
sqlContext.sql("drop table stations")

DataFrame[]

In [6]:
# Show tables
hc.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|   carros|      false|
| default|   placas|      false|
+--------+---------+-----------+



## IMPORT CITIES DATA

In [7]:
citiesDF = sqlContext.read.load('C:/Dados/citylines/cities.csv', 
                                format='com.databricks.spark.csv', 
                                header='true', 
                                inferSchema='true')
# Count rows
citiesDF.count()

334

In [8]:
# Print Schema
citiesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- url_name: string (nullable = true)
 |-- country: string (nullable = true)
 |-- country_state: string (nullable = true)



In [9]:
# Check data
citiesDF.show(3)

+---+--------+--------------------+----------+--------+---------+-------------+
| id|    name|              coords|start_year|url_name|  country|country_state|
+---+--------+--------------------+----------+--------+---------+-------------+
|  5|Aberdeen|  POINT(-2.15 57.15)|      2017|aberdeen| Scotland|         null|
|  6|Adelaide|POINT(138.6 -34.9...|      2017|adelaide|Australia|         null|
|  7| Algiers|POINT(3 36.83333333)|      2017| algiers|  Algeria|         null|
+---+--------+--------------------+----------+--------+---------+-------------+
only showing top 3 rows



In [10]:
# Export DataFrame to a Table
citiesDF.write.saveAsTable("cities")

In [11]:
# Print 3 rows from table
hc.sql("select * from cities limit 3").show()

+---+--------+--------------------+----------+--------+---------+-------------+
| id|    name|              coords|start_year|url_name|  country|country_state|
+---+--------+--------------------+----------+--------+---------+-------------+
|  5|Aberdeen|  POINT(-2.15 57.15)|      2017|aberdeen| Scotland|         null|
|  6|Adelaide|POINT(138.6 -34.9...|      2017|adelaide|Australia|         null|
|  7| Algiers|POINT(3 36.83333333)|      2017| algiers|  Algeria|         null|
+---+--------+--------------------+----------+--------+---------+-------------+



## IMPORT LINES DATA

In [12]:
linesDF = sqlContext.read.load('C:/Dados/citylines/lines.csv', 
                                format='com.databricks.spark.csv', 
                                header='true', 
                                inferSchema='true')
# Count rows
linesDF.count()

1343

In [13]:
# Print Schema
linesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- url_name: string (nullable = true)
 |-- color: string (nullable = true)
 |-- system_id: integer (nullable = true)
 |-- transport_mode_id: integer (nullable = true)



In [14]:
# Check data
linesDF.show(30)

+----+-------+--------------------+--------------------+-------+---------+-----------------+
|  id|city_id|                name|            url_name|  color|system_id|transport_mode_id|
+----+-------+--------------------+--------------------+-------+---------+-----------------+
|  43|      4|             Línea 2|          43-linea-2|#ffbe2e|      267|                4|
|  75|     34|Línea 3 Metro de ...|75-linea-3-metro-...|   #000|      119|             null|
| 107|    126|             Línea 1|         107-linea-1|#434343|      249|             null|
| 604|     74|          La navette|      604-la-navette|#009ab9|      346|             null|
|  61|     66|  Línea 1 (Tramo 1A)|          61-linea-1|#49aa43|      250|             null|
|1471|     91|  Tokaido Shinkansen|1471-tokaido-shin...|#0000ff|      551|                1|
|   5|      1|                   A|                   a|#00b3da|      254|                4|
| 168|    261|             Línea 1|         168-linea-1|#f58223|      

In [15]:
# Export DataFrame to a Table
linesDF.write.saveAsTable("lines")

In [16]:
# Print 3 rows from table
hc.sql("select * from lines limit 3").show()

+---+-------+--------------------+--------------------+-------+---------+-----------------+
| id|city_id|                name|            url_name|  color|system_id|transport_mode_id|
+---+-------+--------------------+--------------------+-------+---------+-----------------+
| 43|      4|             Línea 2|          43-linea-2|#ffbe2e|      267|                4|
| 75|     34|Línea 3 Metro de ...|75-linea-3-metro-...|   #000|      119|             null|
|107|    126|             Línea 1|         107-linea-1|#434343|      249|             null|
+---+-------+--------------------+--------------------+-------+---------+-----------------+



## IMPORT STATION_LINES DATA

In [17]:
station_linesDF = sqlContext.read.load('C:/Dados/citylines/station_lines.csv', 
                                format='com.databricks.spark.csv', 
                                header='true', 
                                inferSchema='true')
# Count rows
station_linesDF.count()

16242

In [18]:
# Print Schema
station_linesDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- line_id: integer (nullable = true)
 |-- city_id: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [19]:
# Check data
station_linesDF.show(30)

+---+----------+-------+-------+-------------------+-------------------+
| id|station_id|line_id|city_id|         created_at|         updated_at|
+---+----------+-------+-------+-------------------+-------------------+
| 47|      7754|    570|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 48|      7771|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 49|      7764|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 50|      7763|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 51|      7729|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 52|      7772|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 53|      8523|    582|     70|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 54|      8525|    582|     70|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 55|        93|     10|      1|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 56|      6005|    442|    114|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 57|      6102|    495|    114|2017-11-21 00:00:00

In [20]:
# Export DataFrame to a Table
station_linesDF.write.saveAsTable("station_lines")

In [21]:
# Print 3 rows from table
hc.sql("select * from station_lines limit 3").show()

+---+----------+-------+-------+-------------------+-------------------+
| id|station_id|line_id|city_id|         created_at|         updated_at|
+---+----------+-------+-------+-------------------+-------------------+
| 47|      7754|    570|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 48|      7771|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
| 49|      7764|    571|     74|2017-11-21 00:00:00|2017-11-21 00:00:00|
+---+----------+-------+-------+-------------------+-------------------+



## IMPORT STATION DATA

In [22]:
stationsDF = sqlContext.read.load('C:/Dados/citylines/stations.csv', 
                                format='com.databricks.spark.csv', 
                                header='true', 
                                inferSchema='true')
# Count rows
stationsDF.count()

16416

In [23]:
# Print Schema
stationsDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- geometry: string (nullable = true)
 |-- buildstart: integer (nullable = true)
 |-- opening: integer (nullable = true)
 |-- closure: integer (nullable = true)
 |-- city_id: integer (nullable = true)



In [24]:
# Check data
stationsDF.show(3)

+----+-----------------+--------------------+----------+-------+-------+-------+
|  id|             name|            geometry|buildstart|opening|closure|city_id|
+----+-----------------+--------------------+----------+-------+-------+-------+
|7694| Keisei Tsudanuma|POINT(140.0248121...|      1921|   1921| 999999|    114|
|6003|Kossuth Lajos tér|POINT(19.04623765...|         0|      0| 999999|     29|
|7732|    Saint-Charles|POINT(5.3801556 4...|      1973|   1977| 999999|     74|
+----+-----------------+--------------------+----------+-------+-------+-------+
only showing top 3 rows



In [25]:
# Export DataFrame to a Table
stationsDF.write.saveAsTable("stations")

In [26]:
# Print 3 rows from table
hc.sql("select * from stations limit 3").show()

+----+-----------------+--------------------+----------+-------+-------+-------+
|  id|             name|            geometry|buildstart|opening|closure|city_id|
+----+-----------------+--------------------+----------+-------+-------+-------+
|7694| Keisei Tsudanuma|POINT(140.0248121...|      1921|   1921| 999999|    114|
|6003|Kossuth Lajos tér|POINT(19.04623765...|         0|      0| 999999|     29|
|7732|    Saint-Charles|POINT(5.3801556 4...|      1973|   1977| 999999|     74|
+----+-----------------+--------------------+----------+-------+-------+-------+



## DATA ANALYSIS

In [28]:
# Qual pais tem mais cidades?
hc.sql("select country, count(distinct name) cnt_city \
       from cities c \
       group by country \
       order by cnt_city desc \
       limit 1").show()

+-------------+--------+
|      country|cnt_city|
+-------------+--------+
|United States|     111|
+-------------+--------+



In [29]:
# Qual cidade tem mais linhas?
hc.sql("select c.name city_name , count(distinct l.id) count_lines \
       from cities c \
           join lines l on l.city_id = c.id \
       group by c.name \
       order by count_lines desc \
       limit 1").show()

+---------+-----------+
|city_name|count_lines|
+---------+-----------+
|    Tokyo|        253|
+---------+-----------+



In [30]:
# Quais são 10 linhas que passam por mais cidade?
hc.sql("select l.id, l.name line, count(distinct l.city_id) count_city \
       from lines l \
       group by l.id, l.name \
       order by count_city desc \
       limit 10").show()

+----+--------------------+----------+
|  id|                line|count_city|
+----+--------------------+----------+
| 850|         Barrie Line|         1|
| 275|Linha 6 • Laranja...|         1|
| 687|      Northern Lines|         1|
|1407|           Gose Line|         1|
|  40|            Línea 4A|         1|
| 506|        T 11 express|         1|
|1481|    Kobe Kosoku Line|         1|
| 641|        Hondori Line|         1|
|1104|            del Bajo|         1|
| 736|Disneyland Resort...|         1|
+----+--------------------+----------+



In [31]:
# O pais que tem mais linhas
hc.sql("select c.country, count(distinct l.id) count_lines \
       from cities c \
           join lines l on l.city_id = c.id\
       group by c.country \
       order by count_lines desc \
       limit 1").show()

+-------+-----------+
|country|count_lines|
+-------+-----------+
|  Japan|        365|
+-------+-----------+



In [32]:
# A linha que tem mais estações
hc.sql("select l.id, l.name line_name, count(distinct sl.station_id) count_stations \
       from lines l \
           join station_lines sl on sl.line_id = l.id \
       group by l.id, l.name \
       order by count_stations desc \
       limit 1").show()

+---+---------+--------------+
| id|line_name|count_stations|
+---+---------+--------------+
|658|     Roca|            95|
+---+---------+--------------+



In [33]:
# Quais são as 10 cidades que possuem as linhas com mais estações
hc.sql("select city_name, max(count_stations) max_stations\
       from (select l.id, c.name city_name, count(distinct sl.station_id) count_stations \
             from cities c \
                 join lines l on l.city_id = c.id \
                 join station_lines sl on sl.line_id = l.id \
             group by l.id, c.name \
             order by count_stations desc) \
       group by city_name \
       order by max_stations desc \
       limit 10").show()

# Eu usei MAX() function pra mostrar cidades diferentes, senão ia duplicar por causa das estações

+------------+------------+
|   city_name|max_stations|
+------------+------------+
|Buenos Aires|          95|
|    Bordeaux|          87|
|      London|          86|
|      Munich|          78|
|      Nantes|          75|
|       Paris|          72|
|       Tours|          62|
|        Graz|          62|
|   Innsbruck|          60|
| Montpellier|          60|
+------------+------------+



In [34]:
# Qual país são os mais presentes entre essas 10 cidades
hc.sql("select distinct country \
        from (select country, city_name, max(count_stations) max_stations\
              from (select l.id, c.name city_name, c.country, count(distinct sl.station_id) count_stations \
                   from cities c \
                       join lines l on l.city_id = c.id \
                       join station_lines sl on sl.line_id = l.id \
                   group by l.id, c.name, c.country \
                   order by count_stations desc) \
              group by country, city_name \
              order by max_stations desc \
              limit 10)").show()

+---------+
|  country|
+---------+
|Argentina|
|   France|
|  England|
|  Germany|
|  Austria|
+---------+



In [35]:
# O país que tem a maior média de estações
hc.sql("select country, avg(count_stations) avg_stations \
        from (select l.id, c.name, c.country, count(distinct sl.station_id) count_stations \
              from cities c \
                  join lines l on l.city_id = c.id \
                  join station_lines sl on sl.line_id = l.id \
              group by l.id, c.name, c.country \
              order by count_stations desc ) \
        group by country \
        order by avg_stations desc \
        limit 1").show()

+---------+------------------+
|  country|      avg_stations|
+---------+------------------+
|Argentina|31.045454545454547|
+---------+------------------+

