# Bikes

Here we load a csv, make some transformations with SQL and save to different formats.

In [1]:
from platform import python_version

print( python_version() )

3.10.11


In [2]:
import sys

print( sys.executable )


/home/art/git/data_engineer/venv/bin/python


In [3]:
print( sys.version_info )

sys.version_info(major=3, minor=10, micro=11, releaselevel='final', serial=0)


In [4]:
!jupyter --version

Selected Jupyter core packages...
IPython          : 8.14.0
ipykernel        : 6.23.1
ipywidgets       : 8.0.6
jupyter_client   : 8.2.0
jupyter_core     : 5.3.0
jupyter_server   : 2.6.0
jupyterlab       : not installed
nbclient         : 0.8.0
nbconvert        : 7.4.0
nbformat         : 5.9.0
notebook         : 6.5.4
qtconsole        : 5.4.3
traitlets        : 5.9.0


In [5]:
from IPython.display        import display, HTML

import findspark
findspark.init()
import pyspark

#from pyspark.context import SparkContext
from pyspark.sql            import SparkSession
from pyspark.sql.functions  import col, explode, udf
from pyspark.sql.types      import Row, StringType, LongType, ArrayType



In [6]:
pyspark.__version__


'3.4.0'

#### avro package

Remember that this is an external package and we need to load it from here.

In [9]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.12:3.4.0 pyspark-shell'


### create Spark Session

In [10]:
spark = SparkSession.builder \
        .appName( 'bikes' )  \
        .getOrCreate()


:: loading settings :: url = jar:file:/home/art/git/data_engineer/venv/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/art/.ivy2/cache
The jars for the packages stored in: /home/art/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2673de7e-1c83-4b9f-8a7c-134521e99a41;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.4.0 in central
	found org.tukaani#xz;1.9 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.4.0/spark-avro_2.12-3.4.0.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.4.0!spark-avro_2.12.jar (150ms)
downloading https://repo1.maven.org/maven2/org/tukaani/xz/1.9/xz-1.9.jar ...
	[SUCCESSFUL ] org.tukaani#xz;1.9!xz.jar (95ms)
:: resolution report :: resolve 1979ms :: artifacts dl 253ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.4.0 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	---------------------------------------------------------------------
	|                  |            modu

In [11]:
file_name = '/home/art/data/202004-capitalbikeshare/bike_100.csv'


In [12]:
df = spark.read.csv( file_name )
df

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string]

In [13]:
df.show( 3, truncate = False )

+----------------+-------------+-------------------+-------------------+---------------------------------+----------------+----------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|_c0             |_c1          |_c2                |_c3                |_c4                              |_c5             |_c6             |_c7           |_c8             |_c9              |_c10            |_c11             |_c12         |
+----------------+-------------+-------------------+-------------------+---------------------------------+----------------+----------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|ride_id         |rideable_type|started_at         |ended_at           |start_station_name               |start_station_id|end_station_name|end_station_id|start_lat       |start_lng        |end_lat         |end_lng          |member_casual|
|77A0F1B26D1597B1|docked_bike  |2020-04-

In [14]:
df.createOrReplaceTempView( 'df' )

In [15]:
spark.sql(
'''
select * 
from df
'''
).show()

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|             _c0|          _c1|                _c2|                _c3|                 _c4|             _c5|                 _c6|           _c7|             _c8|              _c9|            _c10|             _c11|         _c12|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|       start_lat|        start_lng|         end_lat|          end_lng|member_casual|
|77A0F1B26D1597B1|  docked_bike|2020-04-25 17:28:39|2020-04-25 17:35:04|Rhod

### read csv with header

In [16]:
df = spark.read                 \
     .option( 'header', True  ) \
     .csv( file_name )

In [17]:
df.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- start_lat: string (nullable = true)
 |-- start_lng: string (nullable = true)
 |-- end_lat: string (nullable = true)
 |-- end_lng: string (nullable = true)
 |-- member_casual: string (nullable = true)



In [18]:
df.show()

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|       start_lat|        start_lng|         end_lat|          end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+----------------+-----------------+----------------+-----------------+-------------+
|77A0F1B26D1597B1|  docked_bike|2020-04-25 17:28:39|2020-04-25 17:35:04|Rhode Island & Co...|           31239|      12th & L St NW|         31251|38.9059956999911|-77.0398020744324|       38.903819|         -77.0284|       casual|
|8698F10128EA4F18|  docked_bike|2020-04-06 07:54:59|2020-04-06 07:57:24|    

### read csv and infer schema

In [19]:
df = spark.read   \
     .options( header = True, inferSchema = True, delimiter = ','  ) \
     .csv( file_name )

df.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lng: double (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lng: double (nullable = true)
 |-- member_casual: string (nullable = true)



In [20]:
df.createOrReplaceTempView( 'df' )

In [21]:
spark.sql(
'''
select ride_id, start_station_name, started_at
from df
limit 3

'''
).show()

+----------------+--------------------+-------------------+
|         ride_id|  start_station_name|         started_at|
+----------------+--------------------+-------------------+
|77A0F1B26D1597B1|Rhode Island & Co...|2020-04-25 17:28:39|
|8698F10128EA4F18|      21st & I St NW|2020-04-06 07:54:59|
|AA07819DC0F58872|Connecticut Ave &...|2020-04-22 17:06:18|
+----------------+--------------------+-------------------+



In [22]:
df_02 = spark.sql(
'''
select start_station_name, count( * ) as num_rides
from df
group by 1
order by 2 DESC
'''
)

df_02.show()

+--------------------+---------+
|  start_station_name|num_rides|
+--------------------+---------+
|Reston Pkwy & Spe...|       21|
|Potomac & Pennsyl...|        7|
|Market Square / K...|        6|
|Rhodes St & 16th ...|        6|
|Rhode Island & Co...|        6|
|2nd St & Massachu...|        3|
|E Montgomery Ave ...|        3|
|       1st & D St SE|        3|
|Bethesda Ave & Ar...|        2|
| 5th & Kennedy St NW|        2|
| 12th & Irving St NE|        2|
|North Shore Dr & ...|        2|
|       3rd & H St NW|        2|
|Columbia Rd & Geo...|        2|
|      11th & S St NW|        2|
|New York Ave & He...|        2|
|1st & Rhode Islan...|        2|
|       6th & K St NE|        2|
|14th & Harvard St NW|        2|
|Needwood Rd & Eag...|        1|
+--------------------+---------+
only showing top 20 rows



In [23]:
df_03 = spark.sql(
'''
select start_station_name
from df
'''
)

df_03.show()

+--------------------+
|  start_station_name|
+--------------------+
|Rhode Island & Co...|
|      21st & I St NW|
|Connecticut Ave &...|
|       7th & E St SW|
|Potomac & Pennsyl...|
|Potomac & Pennsyl...|
|Potomac & Pennsyl...|
|Calvert & Biltmor...|
|Needwood Rd & Eag...|
|Market Square / K...|
|Market Square / K...|
|Rhode Island & Co...|
|Market Square / K...|
|Market Square / K...|
|Market Square / K...|
|Market Square / K...|
|14th & Harvard St NW|
|14th & Harvard St NW|
|New York Ave & He...|
|2nd St & Massachu...|
+--------------------+
only showing top 20 rows



In [24]:
df_03.describe().show()

+-------+--------------------+
|summary|  start_station_name|
+-------+--------------------+
|  count|                  99|
|   mean|                null|
| stddev|                null|
|    min|      10th & G St NW|
|    max|Veterans Pl & Per...|
+-------+--------------------+



### make joins

In [25]:
rides = spark.sql(
'''
select ride_id, start_station_id, end_station_id
from df
'''
)

rides.show()

+----------------+----------------+--------------+
|         ride_id|start_station_id|end_station_id|
+----------------+----------------+--------------+
|77A0F1B26D1597B1|           31239|         31251|
|8698F10128EA4F18|           31205|         31224|
|AA07819DC0F58872|           31313|         31313|
|DA909BCA92EF85AB|           31294|         31294|
|B36F1E14D8C6757E|           31606|         31608|
|3C10F9AE61844C89|           31606|         31608|
|361BF81F8528597B|           31606|         31608|
|8087C5803C444CA7|           31106|         31113|
|D40BA9257FD7F9BC|           32018|         32015|
|C19F084E1BA87FB9|           31042|         31101|
|36B247781317A0A3|           31042|         31101|
|01D6659BA9B9E8F8|           31239|         31101|
|E52175B3A27ADBB1|           31042|         31064|
|21E11004E625B42D|           31042|         31064|
|0587F0A7A4DED258|           31042|         31064|
|A871CB18FC833DE2|           31042|         31064|
|7C08C151319EB7F6|           31

In [26]:
stations = spark.sql(
'''
select start_station_id as station_id, start_station_name as station_name
from df
'''
)

stations.show()

+----------+--------------------+
|station_id|        station_name|
+----------+--------------------+
|     31239|Rhode Island & Co...|
|     31205|      21st & I St NW|
|     31313|Connecticut Ave &...|
|     31294|       7th & E St SW|
|     31606|Potomac & Pennsyl...|
|     31606|Potomac & Pennsyl...|
|     31606|Potomac & Pennsyl...|
|     31106|Calvert & Biltmor...|
|     32018|Needwood Rd & Eag...|
|     31042|Market Square / K...|
|     31042|Market Square / K...|
|     31239|Rhode Island & Co...|
|     31042|Market Square / K...|
|     31042|Market Square / K...|
|     31042|Market Square / K...|
|     31042|Market Square / K...|
|     31105|14th & Harvard St NW|
|     31105|14th & Harvard St NW|
|     31518|New York Ave & He...|
|     31641|2nd St & Massachu...|
+----------+--------------------+
only showing top 20 rows



In [27]:
rides   .createOrReplaceTempView( 'rides' )
stations.createOrReplaceTempView( 'stations' )

In [28]:
spark.sql(
'''
select 
r.ride_id, r.start_station_id, r.end_station_id,

s.station_name as start_station_name, 

e.station_name as end_station_name 

from rides r 
   INNER JOIN stations s ON r.start_station_id = s.station_id
   INNER JOIN stations e ON r.end_station_id   = e.station_id
   
'''
).show()

+----------------+----------------+--------------+--------------------+--------------------+
|         ride_id|start_station_id|end_station_id|  start_station_name|    end_station_name|
+----------------+----------------+--------------+--------------------+--------------------+
|AA07819DC0F58872|           31313|         31313|Connecticut Ave &...|Connecticut Ave &...|
|DA909BCA92EF85AB|           31294|         31294|       7th & E St SW|       7th & E St SW|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...|8th & Eye St SE /...|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...|8th & Eye St SE /...|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...|8th & Eye St SE /...|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...|8th & Eye St SE /...|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...|8th & Eye St SE /...|
|B36F1E14D8C6757E|           31606|         31608|Potomac & Pennsyl...

### save dataframe to file

In [29]:
df.write \
  .option( 'header' , 'true' ) \
  .csv( '/home/art/data/output/df.csv' )

In [30]:
df.write \
  .json( '/home/art/data/output/df.json' )

In [31]:
#df.show()

In [32]:
# if dataframe has 1 column, then you can write to text file
# else, it will throw error

df_03.write.text( '/home/art/data/output/stations.txt' )



In [33]:
df.write            \
  .format( 'avro' ) \
  .save( '/home/art/data/output/df.avro' )

In [34]:
df.write \
  .parquet( '/home/art/data/output/df.parquet' )

                                                                                

### links

How to run pyspark from jupyter

* https://medium.com/sicara/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f

Course examples

* https://github.com/rudrasingh21/Spark-By-Janani-Ravi/tree/master