In [1]:
import time
t0 = time.time()

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 30 kB/s s eta 0:00:01
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 48.4 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612244 sha256=8fcf5c64215ae51379a043d7243868e4ab7d84e593cd6fe779d08f14f8da5b8e
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, SparkSession
print('Installation takes %s seconds'%(time.time() - t0))

Installation takes 35.363959550857544 seconds


In [4]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Word Count") \
                    .config("spark.some.config.option", "some-value") \
                    .getOrCreate()
spark

## 1. Nhap du lieu thu cong

In [5]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [6]:
from pyspark.sql import Row
df = sc.parallelize(
    [
        Row(name='Alice', age=5, height=80),
        Row(name='Alice', age=5, height=80),
        Row(name='Alice', age=10, height=80)
    ]
).toDF()
df.show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+



In [7]:
SQLContext.registerDataFrameAsTable(sc, df = df, tableName = "table1")

In [8]:
help(SQLContext.sql)

Help on function sql in module pyspark.sql.context:

sql(self, sqlQuery)
    Returns a :class:`DataFrame` representing the result of the given query.
    
    :return: :class:`DataFrame`
    
    >>> sqlContext.registerDataFrameAsTable(df, "table1")
    >>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
    >>> df2.collect()
    [Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
    
    .. versionadded:: 1.0



In [9]:
query = spark.sql("SELECT * FROM table1")
query.collect()

[Row(name='Alice', age=5, height=80),
 Row(name='Alice', age=5, height=80),
 Row(name='Alice', age=10, height=80)]

In [10]:
query.show()

+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice|  5|    80|
|Alice|  5|    80|
|Alice| 10|    80|
+-----+---+------+



## 2. Tai du lieu co san~
### 2.1. Type = `csv`

In [11]:
import pandas as pd
df = pd.read_csv(r'../input/covid19s-impact-on-airport-traffic/covid_impact_on_airport_traffic.csv')
print(df.shape)
df.head()

(5936, 11)


Unnamed: 0,AggregationMethod,Date,Version,AirportName,PercentOfBaseline,Centroid,City,State,ISO_3166_2,Country,Geography
0,Daily,2020-07-05,1.0,Kingsford Smith,52,POINT(151.180087713813 -33.9459774986125),Sydney,New South Wales,AU,Australia,"POLYGON((151.164354085922 -33.9301772341877, 1..."
1,Daily,2020-05-28,1.0,Kingsford Smith,61,POINT(151.180087713813 -33.9459774986125),Sydney,New South Wales,AU,Australia,"POLYGON((151.164354085922 -33.9301772341877, 1..."
2,Daily,2020-05-07,1.0,Kingsford Smith,62,POINT(151.180087713813 -33.9459774986125),Sydney,New South Wales,AU,Australia,"POLYGON((151.164354085922 -33.9301772341877, 1..."
3,Daily,2020-06-24,1.0,Kingsford Smith,58,POINT(151.180087713813 -33.9459774986125),Sydney,New South Wales,AU,Australia,"POLYGON((151.164354085922 -33.9301772341877, 1..."
4,Daily,2020-08-05,1.0,Kingsford Smith,20,POINT(151.180087713813 -33.9459774986125),Sydney,New South Wales,AU,Australia,"POLYGON((151.164354085922 -33.9301772341877, 1..."


### convert csv to parquert

In [12]:
df = spark.read.format("csv").option("header", "true").load(r'../input/covid19s-impact-on-airport-traffic/covid_impact_on_airport_traffic.csv')
df.show(5)

+-----------------+----------+-------+---------------+-----------------+--------------------+------+---------------+----------+---------+--------------------+
|AggregationMethod|      Date|Version|    AirportName|PercentOfBaseline|            Centroid|  City|          State|ISO_3166_2|  Country|           Geography|
+-----------------+----------+-------+---------------+-----------------+--------------------+------+---------------+----------+---------+--------------------+
|            Daily|2020-07-05|    1.0|Kingsford Smith|               52|POINT(151.1800877...|Sydney|New South Wales|        AU|Australia|POLYGON((151.1643...|
|            Daily|2020-05-28|    1.0|Kingsford Smith|               61|POINT(151.1800877...|Sydney|New South Wales|        AU|Australia|POLYGON((151.1643...|
|            Daily|2020-05-07|    1.0|Kingsford Smith|               62|POINT(151.1800877...|Sydney|New South Wales|        AU|Australia|POLYGON((151.1643...|
|            Daily|2020-06-24|    1.0|Kingsfor

### Luu dataframe moi tao voi ten table2

In [13]:
SQLContext.registerDataFrameAsTable(sc, df = df, tableName = "table2")

#### Example. Create a query to count `country` from `table2`

In [14]:
query = spark.sql("""
                    SELECT country, COUNT(*) 
                    FROM table2 
                    GROUP BY country""")
query.show()

+--------------------+--------+
|             country|count(1)|
+--------------------+--------+
|United States of ...|    3642|
|               Chile|     195|
|              Canada|    1888|
|           Australia|     211|
+--------------------+--------+



### 2.2. Load `txt.file`

In [15]:
df = spark.read.text(r'../input/poetry/Lil_Wayne.txt')
df.show(5)

+--------------------+
|               value|
+--------------------+
|They call me Mr C...|
|Of the deads fore...|
|Spilled the heart...|
|I will put them b...|
|Gracias Im crazy ...|
+--------------------+
only showing top 5 rows



#### Example 3. Extract unique `value` started with letter `A` and has almost 15 characters.

In [16]:
SQLContext.registerDataFrameAsTable(sc, df = df, tableName = "table3")

query_2 = spark.sql("""
                    SELECT DISTINCT(*), length(value) AS len
                    FROM table3 
                    WHERE value LIKE 'A%' 
                          AND length(value) < 15
                    ORDER BY len
                    """)
query_2.show()

+--------------+---+
|         value|len|
+--------------+---+
|            Ai|  2|
|           Aha|  3|
|           Aye|  3|
|         And I|  5|
|       Alright|  7|
|       And man|  7|
|      Assholes|  8|
|      Ay Wayne|  8|
|  A damn shame| 12|
| Aint no issue| 13|
|Are right here| 14|
+--------------+---+



### 2.3. Load `parquet.file`

In [17]:
query.write.parquet('customer_parquet')
df = sqlContext.read.parquet('customer_parquet')
df.show()

+----------+------+---+
|CustomerID|Gender|Age|
+----------+------+---+
|         1|  Male| 19|
|         2|  Male| 21|
|         3|Female| 20|
|         4|Female| 23|
|         5|Female| 31|
|         6|Female| 22|
|         7|Female| 35|
|         8|Female| 23|
|         9|  Male| 64|
|        10|Female| 30|
|        11|  Male| 67|
|        12|Female| 35|
|        13|Female| 58|
|        14|Female| 24|
|        15|  Male| 37|
|        16|  Male| 22|
|        17|Female| 35|
|        18|  Male| 20|
|        19|  Male| 52|
|        20|Female| 35|
+----------+------+---+
only showing top 20 rows



## 3. Save the `query result` as `sqlite, csv`

In [18]:
import os
query.write.csv('data_temp_csv')
query.write.parquet('data_temp_parquet')