# Loading and Saving Data in Spark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=cc8ad9e4236414a89b01dd2e3969b45abadb43cbb7b7a6c7efe63c038b73fc7f
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [49]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport()\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [33]:
from pyspark.context import SparkContext
sc = spark.sparkContext

In [50]:
spark



Load the example1.json JSON file 

In [2]:
from google.colab import files
files.upload()

{}

In [None]:
jsonDF = spark.read.json('example1.json')
jsonDF.printSchema()

# Saving Text Files

Using the USDA_activity_dataset_csv dataset 

In [4]:
from google.colab import files
files.upload()

Saving Mall_Customers.csv to Mall_Customers.csv


{'Mall_Customers.csv': b'CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)\r\n0001,Male,19,15,39\r\n0002,Male,21,15,81\r\n0003,Female,20,16,6\r\n0004,Female,23,16,77\r\n0005,Female,31,17,40\r\n0006,Female,22,17,76\r\n0007,Female,35,18,6\r\n0008,Female,23,18,94\r\n0009,Male,64,19,3\r\n0010,Female,30,19,72\r\n0011,Male,67,19,14\r\n0012,Female,35,19,99\r\n0013,Female,58,20,15\r\n0014,Female,24,20,77\r\n0015,Male,37,20,13\r\n0016,Male,22,20,79\r\n0017,Female,35,21,35\r\n0018,Male,20,21,66\r\n0019,Male,52,23,29\r\n0020,Female,35,23,98\r\n0021,Male,35,24,35\r\n0022,Male,25,24,73\r\n0023,Female,46,25,5\r\n0024,Male,31,25,73\r\n0025,Female,54,28,14\r\n0026,Male,29,28,82\r\n0027,Female,45,28,32\r\n0028,Male,35,28,61\r\n0029,Female,40,29,31\r\n0030,Female,23,29,87\r\n0031,Male,60,30,4\r\n0032,Female,21,30,73\r\n0033,Male,53,33,4\r\n0034,Male,18,33,92\r\n0035,Female,49,33,14\r\n0036,Female,21,33,81\r\n0037,Female,42,34,17\r\n0038,Female,30,34,73\r\n0039,Female,36,37,26\r\n0040,Female

**Reading a CSV file into a DataFrame, filter some columns and save it**

In [5]:
data = spark.read.csv('/content/Mall_Customers.csv',inferSchema=True, header=True)

In [6]:
type(data)

pyspark.sql.dataframe.DataFrame

In [7]:
data.show(10)

+----------+------+---+------------------+----------------------+
|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+------+---+------------------+----------------------+
|         1|  Male| 19|                15|                    39|
|         2|  Male| 21|                15|                    81|
|         3|Female| 20|                16|                     6|
|         4|Female| 23|                16|                    77|
|         5|Female| 31|                17|                    40|
|         6|Female| 22|                17|                    76|
|         7|Female| 35|                18|                     6|
|         8|Female| 23|                18|                    94|
|         9|  Male| 64|                19|                     3|
|        10|Female| 30|                19|                    72|
+----------+------+---+------------------+----------------------+
only showing top 10 rows



In [8]:
data.columns

['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']

In [56]:
data.createOrReplaceTempView("temp_table")

Filter data by several columns

In [9]:
dataF=data.select('CustomerID', 'Genre', 'Age',)

Save only the filtered Data

In [10]:
dataF.write.csv("USDA_income_poverty.csv")

Let's read this new file back into an RDD

In [None]:
rddL=sc.textFile("USDA_income_poverty.csv",)
rddL.collect()

# **Hive Example**

Using Hive to create and read a table - Simple Example

In [63]:
from pyspark.sql import Row,SQLContext
from pyspark.sql import HiveContext
#sqlContext = HiveContext(sc)
test_list = [('A', 25),('B', 20),('C', 25),('D', 18)]
rdd = sc.parallelize(test_list)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
#schemaPeople = sqlContext.createDataFrame(people)
schemaPeople = spark.createDataFrame(people)
# Register it as a temp table
schemaPeople.createOrReplaceTempView('schemaPeople')
#schemaPeople.registerDataFrameAsTable(schemaPeople, "test_table")
spark.sql("show tables").show()


+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|         |         ex1|       true|
|         |schemapeople|       true|
|         |  temp_table|       true|
|         |  test_table|       true|
+---------+------------+-----------+



In [64]:
sqlContext.sql("Select * from schemapeople").show()

+----+---+
|name|age|
+----+---+
|   A| 25|
|   B| 20|
|   C| 25|
|   D| 18|
+----+---+



Let's query the table

In [21]:
sqlContext.sql("Select * from test_table").show()

+----+---+
|name|age|
+----+---+
|   A| 25|
|   B| 20|
|   C| 25|
|   D| 18|
+----+---+



In [65]:
spark.sql("Select * from test_table").show()

+----+---+
|name|age|
+----+---+
|   A| 25|
|   B| 20|
|   C| 25|
|   D| 18|
+----+---+



**Load a JSON file with Hive and use SQL on it**

In [66]:
## Colab code only - DO NOT run outsie of colab
from google.colab import files  
files.upload()

Saving simple.json to simple.json


{'simple.json': b'\'{"name":"John", "age":30, "car":23}\''}

Let's load example1.json with Hive a do a Select Statement on it

In [73]:
from pyspark.sql import HiveContext
#hiveCtx = HiveContext(sc)
#ex1 = hiveCtx.read.json("simple.json")
ex1 = spark.read.json("simple.json")
ex1.createOrReplaceTempView("simple")
spark.sql('select * from simple').show()
#results = hiveCtx.sql("SELECT  FROM ex1").show()

+----+
|name|
+----+
|John|
+----+



In [28]:
data.toPandas()

Unnamed: 0,CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77
4,5,Female,31,17,40
...,...,...,...,...,...
195,196,Female,35,120,79
196,197,Female,45,126,28
197,198,Male,32,126,74
198,199,Male,32,137,18


In [29]:
data.show(2)

+----------+-----+---+------------------+----------------------+
|CustomerID|Genre|Age|Annual Income (k$)|Spending Score (1-100)|
+----------+-----+---+------------------+----------------------+
|         1| Male| 19|                15|                    39|
|         2| Male| 21|                15|                    81|
+----------+-----+---+------------------+----------------------+
only showing top 2 rows

