## Spark SQL for csv data querying

In [67]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [68]:
df2 = sqlContext.read.load("/Users/binggangliu/Downloads/WaterSites_info.csv",
                           format = 'com.databricks.spark.csv',
                           header = 'true', # use first line of file as header
                           inferSchema = 'true', # automatically infer data types
                           nullValue = 'NA')

In [69]:
# df2.printSchema()

In [70]:
df3 = df2.withColumnRenamed('Data Export', '_c0')

In [71]:
df3.select("_c0").distinct().show()

+--------------------+
|                 _c0|
+--------------------+
|         New Site/No|
|High Water Usage ...|
|Yes as part of co...|
|                null|
|                  No|
|                 Yes|
| Jun 2009 - May 2010|
|1 - sorted by sit...|
+--------------------+



In [72]:
df3.show(5)

+--------------------+-----------------+----+---------------+-------+------+----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|                 _c0|              _c1| _c2|            _c3|    _c4|   _c5|       _c6|        _c7|                 _c8|                 _c9|                _c10|                _c11|                _c12|                _c13|                _c14|                _c15|                _c16|                _c17|                _c18|                _c19|                _c20|                _c21|                _c22|                _c23|                _c24|                _c25| _c26|
+--------------------+----------

In [73]:
from pyspark.sql.functions import monotonically_increasing_id
#df3.select(monotonically_increasing_id().alias('id')).show(10)
df3.select('_c4').show(10)

+-------------+
|          _c4|
+-------------+
|         null|
|         null|
|         null|
|         null|
|      Country|
|United States|
|United States|
|United States|
|        China|
|      Germany|
+-------------+
only showing top 10 rows



In [74]:
df3 = df3.withColumn('ID', monotonically_increasing_id().alias('ID'))
df3 = df3.filter(df3.ID > 3)
df3.show(4)

+--------------------+-----------------+-------+---------------+-------------+-------------+----------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+---+
|                 _c0|              _c1|    _c2|            _c3|          _c4|          _c5|       _c6|        _c7|                 _c8|                 _c9|                _c10|                _c11|                _c12|                _c13|                _c14|                _c15|                _c16|                _c17|                _c18|                _c19|                _c20|                _c21|                _c22|                _c23|                _c24|                _c25| _c

In [75]:
# df3.columns

In [76]:
df4 = df3.drop('_c0', '_c1', '_c12', '_c13', '_c14', '_c15', '_c16', '_c17', '_c18', '_c19', '_c20', '_c21', '_c22', '_c23', '_c24','_c25', '_c26')

In [77]:
df4.show(4)

+-------+---------------+-------------+-------------+----------+-----------+--------------------+--------------------+--------------------+--------------------+---+
|    _c2|            _c3|          _c4|          _c5|       _c6|        _c7|                 _c8|                 _c9|                _c10|                _c11| ID|
+-------+---------------+-------------+-------------+----------+-----------+--------------------+--------------------+--------------------+--------------------+---+
|   City|State/ Province|      Country|       Region|Latitude 1|Longitude 1|8A Delivered Sour...|8B Delivered Sour...|8B Surface and Ra...|8B Ground Water S...|  4|
|  Blair|             NE|United States|NORTH AMERICA|        42|        -96|                   0|             688,905|                   0|                   0|  5|
|  Blair|             NE|United States|NORTH AMERICA|        42|        -96|                   0|           9,488,173|                   0|                   0|  6|
|Memphis| 

In [78]:
from pyspark.sql.functions import regexp_replace

df4 = df4.withColumn('_c8', regexp_replace('_c8', ',', ''))
df4 = df4.withColumn('_c9', regexp_replace('_c9', ',', ''))
df4 = df4.withColumn('_c10', regexp_replace('_c10', ',', ''))
df4 = df4.withColumn('_c11', regexp_replace('_c11', ',', ''))

In [79]:
df4.select('_c8', '_c9', '_c10', '_c11').show(8)

+--------------------+--------------------+--------------------+--------------------+
|                 _c8|                 _c9|                _c10|                _c11|
+--------------------+--------------------+--------------------+--------------------+
|8A Delivered Sour...|8B Delivered Sour...|8B Surface and Ra...|8B Ground Water S...|
|                   0|              688905|                   0|                   0|
|                   0|             9488173|                   0|                   0|
|                   0|             7037363|                   0|                   0|
|                   0|             3315709|                   0|                   0|
|                   0|               13295|                   0|             2901482|
|                   0|               22685|                   0|             2653627|
|                   0|             1575256|                   0|              788616|
+--------------------+--------------------+-----------

In [80]:
from pyspark.sql.types import FloatType

df4 = df4.withColumn('_c6', df4._c6.cast(FloatType()))
df4 = df4.withColumn('_c7', df4._c7.cast(FloatType()))

df4 = df4.withColumn('_c8', df4._c8.cast(FloatType()))
df4 = df4.withColumn('_c9', df4._c9.cast(FloatType()))
df4 = df4.withColumn('_c10', df4._c10.cast(FloatType()))
df4 = df4.withColumn('_c11', df4._c11.cast(FloatType()))


In [81]:
df4.printSchema()

root
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: float (nullable = true)
 |-- _c7: float (nullable = true)
 |-- _c8: float (nullable = true)
 |-- _c9: float (nullable = true)
 |-- _c10: float (nullable = true)
 |-- _c11: float (nullable = true)
 |-- ID: long (nullable = false)



In [84]:
df5 = (df4.withColumnRenamed('_c2', 'City').withColumnRenamed('_c3', 'State').withColumnRenamed('_c4', 'Country')
       .withColumnRenamed('_c5', 'Region').withColumnRenamed('_c6', 'Latitude').withColumnRenamed('_c7', 'Longitude')
       .withColumnRenamed('_c8', 'Delivered_A').withColumnRenamed('_c9', 'Delivered_B')
       .withColumnRenamed('_c10', 'Surface_A').withColumnRenamed('_c11', 'Surface_B')
      )
    

In [85]:
df5.show(6)

+--------+---------------+-------------+-------------+--------+---------+-----------+-----------+---------+---------+---+
|    City|          State|      Country|       Region|Latitude|Longitude|Delivered_A|Delivered_B|Surface_A|Surface_B| ID|
+--------+---------------+-------------+-------------+--------+---------+-----------+-----------+---------+---------+---+
|    City|State/ Province|      Country|       Region|    null|     null|       null|       null|     null|     null|  4|
|   Blair|             NE|United States|NORTH AMERICA|    42.0|    -96.0|        0.0|   688905.0|      0.0|      0.0|  5|
|   Blair|             NE|United States|NORTH AMERICA|    42.0|    -96.0|        0.0|  9488173.0|      0.0|      0.0|  6|
| Memphis|             TN|United States|NORTH AMERICA|    35.0|    -90.0|        0.0|  7037363.0|      0.0|      0.0|  7|
|Songyuan|          Jilin|        China| ASIA/PACIFIC|    45.0|    125.0|        0.0|  3315709.0|      0.0|      0.0|  8|
| Krefeld|        GERMAN

In [91]:
df5 = df5.filter(df5.ID != 4)
df6 = df5.drop('ID')

In [92]:
df6.show(5)

+--------+-------+-------------+-------------+--------+---------+-----------+-----------+---------+---------+
|    City|  State|      Country|       Region|Latitude|Longitude|Delivered_A|Delivered_B|Surface_A|Surface_B|
+--------+-------+-------------+-------------+--------+---------+-----------+-----------+---------+---------+
|   Blair|     NE|United States|NORTH AMERICA|    42.0|    -96.0|        0.0|   688905.0|      0.0|      0.0|
|   Blair|     NE|United States|NORTH AMERICA|    42.0|    -96.0|        0.0|  9488173.0|      0.0|      0.0|
| Memphis|     TN|United States|NORTH AMERICA|    35.0|    -90.0|        0.0|  7037363.0|      0.0|      0.0|
|Songyuan|  Jilin|        China| ASIA/PACIFIC|    45.0|    125.0|        0.0|  3315709.0|      0.0|      0.0|
| Krefeld|GERMANY|      Germany|       EUROPE|    51.0|      7.0|        0.0|    13295.0|      0.0|2901482.0|
+--------+-------+-------------+-------------+--------+---------+-----------+-----------+---------+---------+
only showi

In [121]:
df6.registerTempTable("WaterInfo")
Countries = sqlContext.sql("select distinct Country, count(Country) as cnt from WaterInfo group by Country order by cnt desc")
States = sqlContext.sql("select distinct State, count(State) as snt from WaterInfo group by State order by snt desc")

In [122]:
# Countries.collect()

In [123]:
Countries.show()

+--------------+---+
|       Country|cnt|
+--------------+---+
| United States| 37|
|         China| 11|
|       Belgium| 10|
|United Kingdom| 10|
|   Netherlands| 10|
|        Brazil|  9|
|       Germany|  7|
|        Canada|  6|
|     Argentina|  5|
|        Russia|  5|
|         Spain|  5|
|     Australia|  4|
|      Thailand|  3|
|         India|  3|
|        France|  2|
|      Malaysia|  2|
|        Poland|  1|
|     Venezuela|  1|
|         Italy|  1|
|       Ukraine|  1|
+--------------+---+
only showing top 20 rows



In [124]:
States.show()

+--------------+---+
|         State|snt|
+--------------+---+
|   NETHERLANDS| 10|
|       BELGIUM|  9|
|             0|  7|
|       ENGLAND|  6|
|            TX|  6|
|  Minas Gerais|  5|
|            IA|  5|
|        RUSSIA|  5|
|       GERMANY|  5|
|            MO|  4|
|         SPAIN|  4|
|UNITED KINGDOM|  4|
|      THAILAND|  3|
|        BRAZIL|  3|
|            TN|  3|
|      Santa Fe|  3|
|            NE|  3|
|         India|  2|
|            SK|  2|
|     Guangdong|  2|
+--------------+---+
only showing top 20 rows



In [129]:
Germany_Delivered = sqlContext.sql("select City, Delivered_A, Delivered_B from WaterInfo where Country = 'Germany' order by City")

In [130]:
Germany_Delivered.show()

+-------+-----------+-----------+
|   City|Delivered_A|Delivered_B|
+-------+-----------+-----------+
|  Barby|        0.0|  1543247.0|
|Krefeld|        0.0|    13295.0|
|  Mainz|        0.0|     7573.0|
|  Mainz|        0.0|      406.0|
|  Riesa|        0.0|     2628.0|
|  Riesa|        0.0|      919.0|
|  Riesa|        0.0|      679.0|
+-------+-----------+-----------+



In [131]:
Belgium_Surface = sqlContext.sql("select City, Surface_A, Surface_B from WaterInfo where Country = 'Belgium' order by City")

In [132]:
Belgium_Surface.show()

+-------+---------+---------+
|   City|Surface_A|Surface_B|
+-------+---------+---------+
|Antwerp|      0.0|      0.0|
|Antwerp|      0.0|      0.0|
|Antwerp|      0.0|      0.0|
|  Ghent|      0.0|      0.0|
|  Ghent|      0.0|      0.0|
|  Ghent|      0.0|      0.0|
|  Ghent|      0.0|      0.0|
| Herent|      0.0| 523690.0|
| Izegem|1126547.0|      0.0|
| Izegem|      0.0|      0.0|
+-------+---------+---------+

