In [2]:
!pip install pyspark

Collecting pyspark
  Using cached pyspark-3.2.1-py2.py3-none-any.whl
Collecting py4j==0.10.9.3
  Using cached py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [3]:
import pyspark

In [4]:
import pandas as pd

In [5]:
from pyspark.sql import SparkSession

In [6]:
## Spark needs a sparksession
spark=SparkSession.builder.appName('Practice').getOrCreate()

22/05/15 11:04:02 WARN Utils: Your hostname, Hamiltons-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.12 instead (on interface en0)
22/05/15 11:04:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/15 11:04:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
spark

In [19]:
## Reading a multi-line dataset

df_pyspark = spark.read.json('tenth_scrape.json', multiLine=True)

TypeError: json() got an unexpected keyword argument 'inferSchema'

In [17]:
spark.read.option('header', 'true').json('tenth_scrape.json', multiLine=True).head(1)



In [18]:
## print the schema
df_pyspark.printSchema()

root
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- full_text: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- symbols: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- url: string (nullable = true)
 |-- urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- display_url: string (nullable = true)
 |    |    |-- expanded_url: string (nullable = true)
 |    |    |-- url: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- advertiser_account_type: string (nullable = true)
 |    |-- blocked_by: string (nullable = true)
 |    |-- blocking: string (nullable = true)
 |    |-- business_profile_state: string (nullable = true)
 |    |-- can

In [21]:
## show columns
df_pyspark.columns

['conversation_id',
 'created_at',
 'favorite_count',
 'full_text',
 'hashtags',
 'id',
 'reply_count',
 'retweet_count',
 'symbols',
 'url',
 'urls',
 'user',
 'user_mentions']

In [26]:
## how to show specific columns in Spark, single and multiple
df_pyspark.select(['conversation_id', 'full_text']).show()

+-------------------+--------------------+
|    conversation_id|           full_text|
+-------------------+--------------------+
|1518601977441054726|Lando was out of ...|
|1518266265097355267|DRIVER STANDINGS ...|
|1518234578263097345|LAP 54/53 \n\nLec...|
|1518199409460363264|Forever a legend ...|
|1518253457723039744|Peep the new @F1 ...|
|1517870164351414273|Vettel fingiendo ...|
|1518241098904219648|World champion Ma...|
|1518578572818108417|What. A. Performa...|
|1518263201443553282|charles leclerc g...|
|1518253681837232129|Bahrain P4\nSaudi...|
|1518486434591739904|I love this team ...|
|1518237408927563776|P3... AGAIN! üôåüèÜ...|
|1518225800797040641|Minister of defen...|
|1518283717827043331|Possibly warm tak...|
|1517897486995796419|e cabe ao charles...|
|1517565844225765376|Hamilton y Russel...|
|1518234346230099969|Lap 55/63: After ...|
|1518258482088185859|Bottas isn't play...|
|1518545457651269633|Next stop... MIAM...|
|1518245198610026496|Belissimo, Bulls ...|
+------

In [27]:
## Showing types of all columns
df_pyspark.dtypes

[('conversation_id', 'string'),
 ('created_at', 'string'),
 ('favorite_count', 'bigint'),
 ('full_text', 'string'),
 ('hashtags', 'array<string>'),
 ('id', 'string'),
 ('reply_count', 'bigint'),
 ('retweet_count', 'bigint'),
 ('symbols', 'array<string>'),
 ('url', 'string'),
 ('urls', 'array<struct<display_url:string,expanded_url:string,url:string>>'),
 ('user',
 ('user_mentions',
  'array<struct<id_str:string,name:string,screen_name:string>>')]

In [30]:
## Summary of the spark dataframe
df_pyspark.describe()

DataFrame[summary: string, conversation_id: string, created_at: string, favorite_count: string, full_text: string, id: string, reply_count: string, retweet_count: string, url: string]

In [35]:
## adding columns in data spark dataframe
df_pyspark = df_pyspark.withColumn('Number of replies', df_pyspark['reply_count'])

In [37]:
## Testing that new column was added
df_pyspark.columns

['conversation_id',
 'created_at',
 'favorite_count',
 'full_text',
 'hashtags',
 'id',
 'reply_count',
 'retweet_count',
 'symbols',
 'url',
 'urls',
 'user',
 'user_mentions',
 'Number of replies']

In [39]:
## Drop columns testing
df_pyspark.drop('Number of replies').columns

['conversation_id',
 'created_at',
 'favorite_count',
 'full_text',
 'hashtags',
 'id',
 'reply_count',
 'retweet_count',
 'symbols',
 'url',
 'urls',
 'user',
 'user_mentions']

In [40]:
## Rename the columns
df_pyspark.withColumnRenamed('user_mentions', 'mentions').columns

['conversation_id',
 'created_at',
 'favorite_count',
 'full_text',
 'hashtags',
 'id',
 'reply_count',
 'retweet_count',
 'symbols',
 'url',
 'urls',
 'user',
 'mentions',
 'Number of replies']