#### Copy files to HDFS

In [None]:
!hdfs dfs -copyFromLocal tweets.json tweets.json
!hdfs dfs -copyFromLocal tweets.json.gz tweets.json.gz

#### Converstion from JSON to Parquet

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json("tweets.json")
df.write.parquet("tweets.parquet")

In [28]:
!hdfs dfs -du -h #The size of the parquet is actually little less than that of gzip file.

1.7 M    .sparkStaging
74.8 K   input
185.9 M  server.log
449.0 M  tweets.json
63.3 M   tweets.json.gz
59.4 M   tweets.parquet


# Find #retweets among the total tweets

### JSON source

In [39]:
%time df_json = sqlContext.read.json("tweets.json")
%time df_json.registerTempTable("tweets_json")
%time sqlContext.sql("select count(id), count(retweeted_status.id) from tweets_json").show()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 28.4 s
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.8 ms
+-----+
|  _c0|
+-----+
|47934|
+-----+

CPU times: user 0 ns, sys: 10 ms, total: 10 ms
Wall time: 8.9 s


#### JSON with GZ compression

In [29]:
%time df_json_gz = sqlContext.read.json("tweets.json.gz")
%time df_json_gz.registerTempTable("tweets_json_gz")
%time sqlContext.sql("select count(id), count(retweeted_status.id) from tweets_json_gz").show()

CPU times: user 20 ms, sys: 0 ns, total: 20 ms
Wall time: 31 s
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 339 µs
+------+-----+
|   _c0|  _c1|
+------+-----+
|139434|47934|
+------+-----+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 9.09 s


### Parquet source

In [43]:
%time df_parquet = sqlContext.read.parquet("tweets.parquet")
%time df_parquet.registerTempTable("tweets_parquet")
%time sqlContext.sql("select count(id), count(retweeted_status.id) from tweets_parquet").show()

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 293 ms
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 1.94 ms
+------+-----+
|   _c0|  _c1|
+------+-----+
|139434|47934|
+------+-----+

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 3.13 s


# Removing specific members from the files

## JSON Source and Destination

In [36]:
!hdfs dfs -rm -r tweets_filtered.json.gz 
%time df_json_gz = sqlContext.read.json("tweets.json.gz")
%time df_json_gz.registerTempTable("tweets_json_gz")
%time df_json_gz = sqlContext.sql("select * from tweets_json_gz where id!=459403052467113984")
%time df_json_gz.write.json("tweets_filtered.json.gz")

rm: `tweets_filtered.json.gz': No such file or directory
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 28.8 s
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 360 µs
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 26.9 ms
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 26.4 s


## Parquet Source and Destination

In [35]:
!hdfs dfs -rm -r tweets_filtered.parquet 
%time df_parquet = sqlContext.read.parquet("tweets.parquet")
%time df_parquet.registerTempTable("tweets_parquet")
%time df =  sqlContext.sql("select * from tweets_parquet where id!=459403052467113984")
%time df.write.parquet("tweets_filtered.parquet")

rm: `tweets_filtered.parquet': No such file or directory
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 179 ms
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 283 µs
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 47.9 ms
CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 49.2 s
