# Get Tweets

This script extracts all the tweets with hashtag #covid-19 related to the day before today (yesterday) and saves them into a .csv file.
We use the `tweepy` library, which can be installed with the command `pip install tweepy`.

Firstly, we import the configuration file, called `config.py`, which is located in the same directory of this script.

In [1]:
import os
jv = os.environ.get('JAVA_HOME', None)

import findspark
findspark.init()

os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages com.johnsnowlabs.nlp:spark-nlp_2.12:3.0.0 pyspark-shell'
# '--packages org.postgresql:postgresql:42.1.1 pyspark-shell'

sys.path


['/opt/spark/python',
 '/opt/spark/python/lib/py4j-0.10.9-src.zip',
 '/home/hadoopuser/Documents/twitter_search_api',
 '/home/hadoopuser/.vscode/extensions/ms-toolsai.jupyter-2021.6.999662501/pythonFiles/vscode_datascience_helpers',
 '/home/hadoopuser/.vscode/extensions/ms-toolsai.jupyter-2021.6.999662501/pythonFiles',
 '/home/hadoopuser/.vscode/extensions/ms-toolsai.jupyter-2021.6.999662501/pythonFiles/lib/python',
 '/opt/anaconda/envs/pyspark_env/lib/python37.zip',
 '/opt/anaconda/envs/pyspark_env/lib/python3.7',
 '/opt/anaconda/envs/pyspark_env/lib/python3.7/lib-dynload',
 '',
 '/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages',
 '/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages/IPython/extensions',
 '/home/hadoopuser/.ipython']

In [2]:
import sys, glob, os
sys.path.extend(glob.glob(os.path.join(os.path.expanduser("~"), ".ivy2/jars/*.jar")))

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark=SparkSession.builder \
.appName("Spark_and_Pandas_twitter_dfs") \
.master("local[*]") \
.getOrCreate()

In [4]:
spark

In [5]:
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true")



In [6]:
spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")

'true'

In [7]:
spark.conf.get("spark.sql.execution.arrow.pyspark.fallback.enabled")

'true'

```
import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
```


`result_pdf.head(2)`

In [8]:
import mypy
from config import *
import tweepy
import datetime

import sys
import logging

logger = logging.getLogger('tweets_search')

In [9]:
import pandas as pd
# import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer

In [10]:
print(f"logger.root.level = {logger.root.level}, logger.root.name = {logger.root.name}")
print(f"logger.name = {logger.name}")

logger.root.level = 30, logger.root.name = root
logger.name = tweets_search


In [11]:
format = "%(asctime)s - %(levelname)s - %(message)s"
# logging.basicConfig(format=format, stream=sys.stdout, level = logging.DEBUG)
logging.basicConfig(format=format, stream=sys.stdout, level = logging.INFO)

In [12]:
print(logger.root.level)

20


In [17]:
# logger.root.level = 10

In [18]:
# print(logger.root.level)

10


We setup the connection to our Twitter App by using the `OAuthHandler()` class and its `access_token()` function. Then we call the Twitter API through the `API()` function.

In [13]:
auth = tweepy.OAuthHandler(TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth,wait_on_rate_limit=True, wait_on_rate_limit_notify = True)

In [1]:
# api.me()

In [2]:
# api.rate_limit_status()

## setup dates (recent 7 days max)
```
pdf['created_at'].dt.strftime('%Y-%m-%d')
- provide since and until dates (date_format = 'Y-m-d', i.e '2021-01-30')
since:'', until:'', or
- provide timedelta (until= datetime.date.today(), since= datetime.date.today()-timedelta), i.e timedelta: '2', or
- setup


Now we setup dates. We need to setup today and yesterday.
```

## setup dates (recent 7 days max)

If today is 2021-06-26 then :

1. `time_frame = {timedelta:'2'}` (we get tweets from 2021-0-24 up to 2021-06-25 (today - 1 day))
2. `time_frame = {since:'2021-06-23', timedelta:'2'}` 
3. `time_frame = {until:'2021-06-25', timedelta:'2'}` (2 & 3 & 4 expressions are equivalent)
4. `time_frame = {since:'2021-06-23', until:'2021-06-25'}` -> we get tweets from 2021-06-23 up to 2021-06-24

`note:` from today we can get a time_frame of 7 days max, i.e since 2021-06-19

In [14]:
# https://www.educative.io/edpresso/how-to-convert-a-string-to-a-date-in-python
import datetime
since = '2021-01-30'
datetime.datetime.strptime(since, '%Y-%m-%d').date()

datetime.date(2021, 1, 30)

In [15]:
today = datetime.date.today()
since= today - datetime.timedelta(days=2)
until= today
until, since
# (datetime.date(2021, 6, 7), datetime.date(2021, 6, 6))

(datetime.date(2021, 6, 28), datetime.date(2021, 6, 26))

In [16]:
logger.debug(f"time_frame: '{until, since}'")

We search for tweets on Twitter by using the `Cursor()` function. 
We pass the `api.search` parameter to the cursor, as well as the query string, which is specified through the `q` parameter of the cursor.
The query string can receive many parameters, such as the following (not mandatory) ones:
* `from:` - to specify a specific Twitter user profile
* `since:` - to specify the beginning date of search
* `until:` - to specify the ending date of search
The cursor can also receive other parameters, such as the language and the `tweet_mode`. If `tweet_mode='extended'`, all the text of the tweet is returned, otherwise only the first 140 characters.

In [None]:
# # example 
# code tweets = tweepy.Cursor(api.search, tweet_mode=’extended’) 
# for tweet in tweets:
#     content = tweet.full_text

In [None]:
# tweets_list = tweepy.Cursor(api.search, q="#Covid-19 since:" + str(yesterday)+ " until:" + str(today),tweet_mode='extended', lang='en').items()

In [None]:
# tweets_list = tweepy.Cursor(api.search, q=f"#Covid-19 since:{str(yesterday)} until:{str(today)}",tweet_mode='extended', lang='en').items()

In [24]:
# tweets_list = tweepy.Cursor(api.search, q=['astrazeneca', 'pfizer'],since= str(since), until=str(until),tweet_mode='extended', lang='en').items()

# Greek Language = el
# tweets_list = tweepy.Cursor(api.search, q=['coffee island'],since= str(since), until=str(until),tweet_mode='extended', lang='el').items()

# English Language = en
tweets_list = tweepy.Cursor(api.search, q=['coffee island OR CoffeeIsland'],since= str(since), until=str(until),tweet_mode='extended', lang='en').items()

Now we loop across the `tweets_list`, and, for each tweet, we extract the text, the creation date, the number of retweets and the favourite count. We store every tweet into a list, called `output`.

In [18]:
import time
seconds = 5
start = time.time()
time.sleep(seconds)
end = time.time()
logger.info(f"elapsed_time: '{end - start}'")

2021-06-28 21:15:19,468 - INFO - elapsed_time: '5.003556251525879'


---
# TEST

---

In [19]:
# tweets_list2 = tweepy.Cursor(api.search, q=['pfizer','astrazeneca'],since= str(since), until=str(until),tweet_mode='extended', lang='en').items(2)

import time
start = time.time()
output = []
for tweet in tweets_list:
    # text = tweet._json["full_text"]
    #print(text) 
    # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/search/api-reference/get-search-tweets           
    # "geo": null,"coordinates": null,"place": null,"contributors": null,
    # "is_quote_status": false,"retweet_count": 988,"favorite_count": 3875,
    # "favorited": false,"retweeted": false,"possibly_sensitive": false,"lang": "en"
    # https://developer.twitter.com/en/docs/twitter-ids
    logger.info(f"tweet_id_str: {'-'*30}")
    logger.info(f"created_at: {tweet.created_at}")
    logger.info(f"full_text: {tweet._json['full_text']}")
    logger.info(f"tweet_id: {tweet.id}")
    logger.info(f"tweet_id_str: {tweet.id_str}")
    logger.info(f"user: {tweet._json['user']['name']}")
    logger.info(f"user_id: {tweet._json['user']['id']}")    
    # favourite_count = tweet.favorite_count
    # retweet_count = tweet.retweet_count
    # created_at = tweet.created_at
    
#     line = {'text' : text, 'favourite_count' : favourite_count, 'retweet_count' : retweet_count, 'created_at' : created_at}
#     output.append(line)
#     logger.info(f"Append list length : { len(output)}")
# end = time.time()
# logger.info(f"elapsed_time: '{end - start}'")

t™️
2021-06-28 21:16:22,730 - INFO - user_id: 1194346351179501576
2021-06-28 21:16:22,733 - INFO - tweet_id_str: ------------------------------
2021-06-28 21:16:22,734 - INFO - created_at: 2021-06-26 14:27:32
2021-06-28 21:16:22,735 - INFO - full_text: ***Very Strawberry &amp; Blue Cotton Candy***
Chocolate Ice Cream &amp; Vanilla Ice Cream
NSA Vanilla &amp; Campfire Crush
New York Cheesecake &amp; Pineapple Dole Soft Serve
Cold Brew Coffee Gelato &amp; Cake Batter
Sweet Coconut &amp; Island Banana
Cookie n' Cream &amp; Caramel Sea Salt Gelato https://t.co/qT474mhoyy
2021-06-28 21:16:22,735 - INFO - tweet_id: 1408794058647035905
2021-06-28 21:16:22,736 - INFO - tweet_id_str: 1408794058647035905
2021-06-28 21:16:22,737 - INFO - user: sweetFrog Dundalk
2021-06-28 21:16:22,749 - INFO - user_id: 1895585546
2021-06-28 21:16:22,750 - INFO - tweet_id_str: ------------------------------
2021-06-28 21:16:22,751 - INFO - created_at: 2021-06-26 14:07:13
2021-06-28 21:16:22,755 - INFO - full_text:

---

In [25]:
import time
start = time.time()
output = []
for tweet in tweets_list:
    text = tweet._json["full_text"]
    #print(text) 
    # https://developer.twitter.com/en/docs/twitter-api/v1/tweets/search/api-reference/get-search-tweets           
    # "geo": null,"coordinates": null,"place": null,"contributors": null,
    # "is_quote_status": false,"retweet_count": 988,"favorite_count": 3875,
    # "favorited": false,"retweeted": false,"possibly_sensitive": false,"lang": "en"
    logger.debug(f"full_text: '{text}'")
    favourite_count = tweet.favorite_count
    retweet_count = tweet.retweet_count
    created_at = tweet.created_at
    
    line = {'text' : text, 'favourite_count' : favourite_count, 'retweet_count' : retweet_count, 'created_at' : created_at}
    output.append(line)
    logger.info(f"Append list length : { len(output)}")
end = time.time()
logger.info(f"elapsed_time: '{end - start}'")

2021-06-28 21:19:47,664 - INFO - Append list length : 1
2021-06-28 21:19:47,670 - INFO - Append list length : 2
2021-06-28 21:19:47,675 - INFO - Append list length : 3
2021-06-28 21:19:47,676 - INFO - Append list length : 4
2021-06-28 21:19:47,680 - INFO - Append list length : 5
2021-06-28 21:19:47,682 - INFO - Append list length : 6
2021-06-28 21:19:47,683 - INFO - Append list length : 7
2021-06-28 21:19:47,684 - INFO - Append list length : 8
2021-06-28 21:19:47,686 - INFO - Append list length : 9
2021-06-28 21:19:47,686 - INFO - Append list length : 10
2021-06-28 21:19:47,690 - INFO - Append list length : 11
2021-06-28 21:19:47,696 - INFO - Append list length : 12
2021-06-28 21:19:47,699 - INFO - Append list length : 13
2021-06-28 21:19:47,704 - INFO - Append list length : 14
2021-06-28 21:19:47,705 - INFO - Append list length : 15
2021-06-28 21:19:54,601 - INFO - Append list length : 16
2021-06-28 21:19:54,602 - INFO - Append list length : 17
2021-06-28 21:19:54,609 - INFO - Append 

In [26]:
output

',
  'favourite_count': 0,
  'retweet_count': 84,
  'created_at': datetime.datetime(2021, 6, 27, 1, 10, 22)},
 {'text': 'RT @jesse_martin_1: One Piece, Coffee Island #fanart #onepiece #digitalart #luffy #OnePiece1017 https://t.co/QruHqMRmrr',
  'favourite_count': 0,
  'retweet_count': 84,
  'created_at': datetime.datetime(2021, 6, 27, 0, 27, 26)},
 {'text': 'All that walking made for thirsty work so we went for a cup of coffee – Ubud Style at the Bali Pulina Plantation. https://t.co/7OapENpCnK #travel',
  'favourite_count': 0,
  'retweet_count': 0,
  'created_at': datetime.datetime(2021, 6, 27, 0, 21, 21)},
 {'text': 'Read Treasure Island and enjoy some milk out of a irish coffee glass.',
  'favourite_count': 0,
  'retweet_count': 0,
  'created_at': datetime.datetime(2021, 6, 27, 0, 6, 4)},
 {'text': '@magmacumlord literally i say water as wadda. new york is new yoak. long island? lawng eyelailand. coffee/coaffee. shit like that especially. whether they notice it or not theres def an a

In [27]:
len(output)

185

---
### create sdf from list

Finally, we convert the `output` list to a `spark DataFrame` and we store results.

In [28]:
sdf = spark.createDataFrame(output)
sdf.show(200, truncate = 30)


+-------------------+---------------+-------------+------------------------------+
|         created_at|favourite_count|retweet_count|                          text|
+-------------------+---------------+-------------+------------------------------+
|2021-06-27 23:45:08|              1|            1|Lava Lei’s signature 100% K...|
|2021-06-27 23:38:04|              0|            0|@johnpavlovitz You might ha...|
|2021-06-27 22:35:32|             16|            1|I wonder if villagers walk ...|
|2021-06-27 22:14:58|              2|            0|@parislord @FeathersOz @the...|
|2021-06-27 22:07:46|              0|            0|@Hold2LLC @TheEliKlein I'm ...|
|2021-06-27 22:00:07|              2|            1|. @WickWrites loves to trav...|
|2021-06-27 21:38:33|              0|            0|@catekitchen OH is the cake...|
|2021-06-27 21:21:35|              7|            1|Ok I’ve had enough of peopl...|
|2021-06-27 20:10:09|              0|            0|Island bound 😁

 #coffeesh...|
|2021

In [None]:
# {"fields":[{"metadata":{},"name":"created_at","nullable":true,"type":"timestamp"},{"metadata":{},"name":"favourite_count","nullable":true,"type":"long"},{"metadata":{},"name":"retweet_count","nullable":true,"type":"long"},{"metadata":{},"name":"text","nullable":true,"type":"string"}],"type":"struct"}

In [29]:
sdf.count()

185

In [30]:
sdf.show(2, truncate =30)

+-------------------+---------------+-------------+------------------------------+
|         created_at|favourite_count|retweet_count|                          text|
+-------------------+---------------+-------------+------------------------------+
|2021-06-27 23:45:08|              1|            1|Lava Lei’s signature 100% K...|
|2021-06-27 23:38:04|              0|            0|@johnpavlovitz You might ha...|
+-------------------+---------------+-------------+------------------------------+
only showing top 2 rows



In [31]:
sdf = sdf.select('text', 'favourite_count', 'retweet_count', 'created_at')

In [32]:
sdf.limit(3).toPandas()

Unnamed: 0,text,favourite_count,retweet_count,created_at
0,Lava Lei’s signature 100% Kona Coffee featurin...,1,1,2021-06-27 23:45:08
1,@johnpavlovitz You might have already passed t...,0,0,2021-06-27 23:38:04
2,I wonder if villagers walk round the island an...,16,1,2021-06-27 22:35:32


In [36]:
# Extract First N rows in pyspark – Top N rows in pyspark using show() function
sdf.show(3, truncate=30)

+------------------------------+---------------+-------------+-------------------+
|                          text|favourite_count|retweet_count|         created_at|
+------------------------------+---------------+-------------+-------------------+
|Lava Lei’s signature 100% K...|              1|            1|2021-06-27 23:45:08|
|@johnpavlovitz You might ha...|              0|            0|2021-06-27 23:38:04|
|I wonder if villagers walk ...|             16|            1|2021-06-27 22:35:32|
+------------------------------+---------------+-------------+-------------------+
only showing top 3 rows



In [33]:
from pyspark.sql.functions import current_date, current_timestamp

data=[["1","02-01-2020 11 01 19 06"],["2","03-01-2019 12 01 19 406"],["3","03-01-2021 12 01 19 406"]]
df2=spark.createDataFrame(data,["id","input"])
df2.show(truncate=False)

df2.withColumn("current_date",current_date()) \
  .withColumn("current_timestamp",current_timestamp()) \
  .show(truncate=False)

+---+-----------------------+
|id |input                  |
+---+-----------------------+
|1  |02-01-2020 11 01 19 06 |
|2  |03-01-2019 12 01 19 406|
|3  |03-01-2021 12 01 19 406|
+---+-----------------------+

+---+-----------------------+------------+-----------------------+
|id |input                  |current_date|current_timestamp      |
+---+-----------------------+------------+-----------------------+
|1  |02-01-2020 11 01 19 06 |2021-06-28  |2021-06-28 21:24:40.028|
|2  |03-01-2019 12 01 19 406|2021-06-28  |2021-06-28 21:24:40.028|
|3  |03-01-2021 12 01 19 406|2021-06-28  |2021-06-28 21:24:40.028|
+---+-----------------------+------------+-----------------------+



In [35]:
df2.select(current_timestamp().alias("current_timestamp")
  ).show(truncate=False)

+----------------------+
|current_timestamp     |
+----------------------+
|2021-06-28 21:25:15.97|
|2021-06-28 21:25:15.97|
|2021-06-28 21:25:15.97|
+----------------------+



---
### save and read sdf without header

 Spark DataFrameWriter class provides a method csv() to save or write a DataFrame at a specified path on disk, this method takes a file path where you wanted to write a file and by default, it doesn’t write a header or column names.

 #### overwrite mode

In [37]:
sdf.write.format('csv').mode('overwrite').save('output_cof_island_sdf.csv', escape = '"')

In [38]:
%ls output_cof_island_sdf.csv

part-00000-6768db96-bdff-4212-8d65-7d462a9b10be-c000.csv  _SUCCESS
part-00001-6768db96-bdff-4212-8d65-7d462a9b10be-c000.csv


### append mode

In [None]:
# sdf.write.csv('output_cof_island_sdf.csv', mode = 'append')

In [39]:
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, StringType, TimestampType
# ['text','favourite_count',	'retweet_count','created_at']
# DateType default format is yyyy-MM-dd 
# TimestampType default format is yyyy-MM-dd HH:mm:ss.SSSS
schema = StructType([
    StructField("text", StringType(), True),
    StructField("favourite_count", IntegerType(), True),
    StructField("retweet_count", IntegerType(), True),
    StructField("created_at", TimestampType(), True)
    ])


sdf_2 = spark.read.csv('output_cof_island_sdf.csv',schema=schema, header=False,escape = '"', multiLine=True) #, escape = '"',

In [40]:
# sdf.show(2, truncate =30)
sdf_2.limit(200).toPandas()

Unnamed: 0,text,favourite_count,retweet_count,created_at
0,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 05:11:07
1,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 05:08:36
2,beach hopping to speciality coffee shop hoppin...,1,0,2021-06-27 05:08:31
3,"RT @cathsherman: #Bahamas Fishing Pier, Great ...",0,41,2021-06-27 05:02:56
4,"@freshwaterpurl With books coffee and cinema, ...",7,0,2021-06-27 04:33:35
...,...,...,...,...
180,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 06:47:39
181,@TrekkieRN_NP Coffee shop might be good! But ...,1,0,2021-06-27 06:12:15
182,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 05:51:31
183,RT @SafemoonWarrior: #SAFEMOON🕵🏻‍♂️ \nCan some...,0,27,2021-06-27 05:39:40


In [41]:
# sdf_2 = sdf_2.sort(F.col('created_at'), ascending=True)
sdf_2 = sdf_2.sort(F.col('created_at').asc())
sdf_2.show(2,truncate=30)

+------------------------------+---------------+-------------+-------------------+
|                          text|favourite_count|retweet_count|         created_at|
+------------------------------+---------------+-------------+-------------------+
|RT @SafemoonWarrior: #SAFEM...|              0|           27|2021-06-26 00:42:05|
|Good Morning

Good Coffee m...|              0|            0|2021-06-26 00:50:02|
+------------------------------+---------------+-------------+-------------------+
only showing top 2 rows



In [43]:
# sdf_2 = sdf_2.sort(F.col('created_at').desc())
sdf_2 = sdf_2.sort(F.col('created_at'), ascending=False)
sdf_2.show(2,truncate=30)

+------------------------------+---------------+-------------+-------------------+
|                          text|favourite_count|retweet_count|         created_at|
+------------------------------+---------------+-------------+-------------------+
|Lava Lei’s signature 100% K...|              1|            1|2021-06-27 23:45:08|
|@johnpavlovitz You might ha...|              0|            0|2021-06-27 23:38:04|
+------------------------------+---------------+-------------+-------------------+
only showing top 2 rows



In [46]:
# Get First N rows in pyspark – Top N rows in pyspark using head() 
# function – (First 10 rows)
sdf_2.head(2)

[Row(text='Lava Lei’s signature 100% Kona Coffee featuring Kona beans grown exclusively on the Big Island of Hawaii is calling your name. To shop, visit https://t.co/BE9AQuLTbQ! ☕🌴☀️ #bayviewfarm https://t.co/Z58mxSAR38', favourite_count=1, retweet_count=1, created_at=datetime.datetime(2021, 6, 27, 23, 45, 8)),
 Row(text='@johnpavlovitz You might have already passed the point of no return on acquiring the taste for it If you’ve never had it, I would not be surprised if you found it incredibly bitter \n\nIf had to live on a deserted island and could only bring a few things with me, coffee would be the first grab', favourite_count=0, retweet_count=0, created_at=datetime.datetime(2021, 6, 27, 23, 38, 4))]

In [48]:
# Extract First row of dataframe in pyspark – using first() function
sdf_2.first()

Row(text='Lava Lei’s signature 100% Kona Coffee featuring Kona beans grown exclusively on the Big Island of Hawaii is calling your name. To shop, visit https://t.co/BE9AQuLTbQ! ☕🌴☀️ #bayviewfarm https://t.co/Z58mxSAR38', favourite_count=1, retweet_count=1, created_at=datetime.datetime(2021, 6, 27, 23, 45, 8))

In [57]:
from pyspark.sql.functions import col
sdf_2.select(col("created_at")).first()[0]

datetime.datetime(2021, 6, 27, 23, 45, 8)

In [58]:
sdf_2.limit(2).toPandas()

Unnamed: 0,text,favourite_count,retweet_count,created_at
0,Lava Lei’s signature 100% Kona Coffee featurin...,1,1,2021-06-27 23:45:08
1,@johnpavlovitz You might have already passed t...,0,0,2021-06-27 23:38:04


In [60]:
sdf_2.sort(F.col('created_at'), ascending=True).limit(2).toPandas()

Unnamed: 0,text,favourite_count,retweet_count,created_at
0,RT @SafemoonWarrior: #SAFEMOON🕵🏻‍♂️ \nCan some...,0,27,2021-06-26 00:42:05
1,Good Morning\n\nGood Coffee makes the day happ...,0,0,2021-06-26 00:50:02


In [61]:
sdf_2.toPandas().tail(2)

Unnamed: 0,text,favourite_count,retweet_count,created_at
183,Good Morning\n\nGood Coffee makes the day happ...,0,0,2021-06-26 00:50:02
184,RT @SafemoonWarrior: #SAFEMOON🕵🏻‍♂️ \nCan some...,0,27,2021-06-26 00:42:05


In [62]:
(sdf_2.count(), len(sdf_2.columns))

(185, 4)

---
### save a sdf to a csv file with header

#### The core syntax for writing data in Apache Spark

DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy( ...).save()

In [191]:
%ls output_cof_island_sdf.csv

part-00000-da49b260-a743-4e17-a788-16cd19b03077-c000.csv  _SUCCESS
part-00001-da49b260-a743-4e17-a788-16cd19b03077-c000.csv


In [63]:
#df = pd.DataFrame(output)
# header: str or bool, optional writes the names of columns as the first line. If None
#  is set, it uses the default value, false.

# DataFrameWriter.csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None)
# .option("timestampFormat", "MM-dd-yyyy hh mm ss")
# sdf_2.write.csv('output_cof_island_sdf.csv', header = 'true', mode='overwrite')

# You can also use below
sdf_2.write.format("csv").mode("overwrite").options(header="true", escape = '"').save("output_cof_island_sdf_3.csv")

---
# Create a sdf from a csv file with header

In [64]:
# spark.read.csv(
#     "some_input_file.csv", 
#     header=True, 
#     mode="DROPMALFORMED", 
#     schema=schema
# )

# or

# (
#     spark.read
#     .schema(schema)
#     .option("header", "true")
#     .option("mode", "DROPMALFORMED")
#     .csv("some_input_file.csv")
# )

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, TimestampType

schema = StructType([
    StructField("text", StringType()),
    StructField("favourite_count", IntegerType()),
    StructField("retweet_count", IntegerType()),
    StructField("created_at", TimestampType())
])
# https://datascience.stackexchange.com/questions/12727/reading-csvs-with-new-lines-in-fields-with-spark
sdf_2 = spark.read.csv('output_cof_island_sdf_3.csv',schema=schema,header=True,  escape = '"',multiLine=True) #escape = '"',

In [65]:
type(sdf_2)
# pyspark.sql.dataframe.DataFrame
sdf.printSchema()

root
 |-- text: string (nullable = true)
 |-- favourite_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [66]:
sdf_2.count() 

185

In [67]:
sdf_2.columns

['text', 'favourite_count', 'retweet_count', 'created_at']

In [68]:
len(sdf_2.columns)

4

In [None]:
# Add this to the your code:

# import pyspark
# def spark_shape(self):
#     return (self.count(), len(self.columns))
# pyspark.sql.dataframe.DataFrame.shape = spark_shape
# Then you can do

# >>> df.shape()
# (10000, 10)

In [69]:
sdf_2.sort(F.col('created_at').desc()).limit(100).toPandas()

Unnamed: 0,text,favourite_count,retweet_count,created_at
0,Lava Lei’s signature 100% Kona Coffee featurin...,1,1,2021-06-27 23:45:08
1,@johnpavlovitz You might have already passed t...,0,0,2021-06-27 23:38:04
2,I wonder if villagers walk round the island an...,16,1,2021-06-27 22:35:32
3,"@parislord @FeathersOz @thebikingvet Yes, for ...",2,0,2021-06-27 22:14:58
4,"@Hold2LLC @TheEliKlein I'm stuck on an island,...",0,0,2021-06-27 22:07:46
...,...,...,...,...
95,"RT @cathsherman: #Bahamas Fishing Pier, Great ...",0,41,2021-06-27 05:02:56
96,"@freshwaterpurl With books coffee and cinema, ...",7,0,2021-06-27 04:33:35
97,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 04:28:01
98,"RT @jesse_martin_1: One Piece, Coffee Island #...",0,84,2021-06-27 04:19:11


In [70]:
sdf_2.toPandas().shape

(185, 4)

In [71]:
# select columns as in a pandas dataframe
sdf_2[['text','created_at']].show(2,truncate=30)

+------------------------------+-------------------+
|                          text|         created_at|
+------------------------------+-------------------+
|@ThebanMonk @Ty_in_TX @El__...|2021-06-27 02:50:12|
|***Very Strawberry &amp; Bl...|2021-06-26 14:27:32|
+------------------------------+-------------------+
only showing top 2 rows



In [72]:
sdf_2.select('text','created_at').describe().show()

+-------+--------------------+
|summary|                text|
+-------+--------------------+
|  count|                 185|
|   mean|                null|
| stddev|                null|
|    min|***Very Strawberr...|
|    max|🥥 Imagine a Boun...|
+-------+--------------------+



In [73]:
sdf_2[['text','created_at']].describe().show()

+-------+--------------------+
|summary|                text|
+-------+--------------------+
|  count|                 185|
|   mean|                null|
| stddev|                null|
|    min|***Very Strawberr...|
|    max|🥥 Imagine a Boun...|
+-------+--------------------+



In [74]:
sdf_2[sdf_2.columns].show(2, truncate=2)

+----+---------------+-------------+----------+
|text|favourite_count|retweet_count|created_at|
+----+---------------+-------------+----------+
|  @T|              2|            0|        20|
|  **|              0|            0|        20|
+----+---------------+-------------+----------+
only showing top 2 rows



---
### def sentiment_scores & sentiment_scoresUDF

ser-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high-performance UDFs entirely in Python.

#### Scalar Pandas UDFs

Scalar Pandas UDFs are used for vectorizing scalar operations. To define a scalar Pandas UDF, simply use @pandas_udf to annotate a Python function that takes in pandas.Series as arguments and returns another pandas.Series of the same size. Below we illustrate using two examples: Plus One and Cumulative Probability.


#### PyArrow versions

PyArrow is installed in Databricks Runtime. For information on the version of PyArrow available in each Databricks Runtime version, see the Databricks runtime release notes.
Supported SQL types

All `Spark SQL data types` are supported by Arrow-based conversion except `MapType`, `ArrayType` of `TimestampType`, and nested `StructType`. `StructType` is represented as a `pandas.DataFrame` instead of `pandas.Series`. `BinaryType` is supported only when PyArrow is equal to or higher than 0.10.0.

https://sparkbyexamples.com/spark/spark-sql-dataframe-data-types/


### Create DataFrame with schema

In [110]:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType

simpleData = [["James ","","Smith","36636","M",3000],
    ["Michael ","Rose","","40288","M",4000],
    ["Robert ","","Williams","42114","M",4000],
    ["Maria ","Anne","Jones","39192","F",4000],
    ["Jen","Mary","Brown","","F",-1]]

simpleSchema = StructType([
    StructField("firstname",StringType()),
    StructField("middlename",StringType()),
    StructField("lastname",StringType()),
    StructField("id", StringType()),
    StructField("gender", StringType()),
    StructField("salary", IntegerType())
])

df = spark.createDataFrame(simpleData, schema=simpleSchema)
df.printSchema()
df.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |          |   Smith|36636|     M|  3000|
| Michael |      Rose|        |40288|     M|  4000|
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



In [568]:
import sys
from pyspark.sql.functions import udf
import json
# DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType,StructField, StructType, Row
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.types as Types
from nltk.sentiment.vader import SentimentIntensityAnalyzer

# from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType

# simpleData = [["James ","","Smith","36636","M",3000],
#     ["Michael ","Rose","","40288","M",4000],
#     ["Robert ","","Williams","42114","M",4000],
#     ["Maria ","Anne","Jones","39192","F",4000],
#     ["Jen","Mary","Brown","","F",-1]]

simpleSchema = StructType([
    StructField("neg",DoubleType()),
    StructField("pos",DoubleType()),
    StructField("compound",DoubleType()),
    StructField("neu", DoubleType())
])

# df = spark.createDataFrame(simpleData, schema=simpleSchema)
# df.printSchema()
# df.show()

# @pandas_udf('str',PandasUDFType.SCALAR)
#  For Types.MapType() - 2 required positional arguments: 'keyType' and 'valueType'
# @pandas_udf(simpleSchema, PandasUDFType.GROUPED_MAP)
# @pandas_udf(Types.MapType(Types.StringType(),Types.DoubleType()))
# @pandas_udf('struct<col1:string>')
# def sentiment_scores(sentance: str) -> dict :
# def sentiment_scores(pdf:pd.DataFrame) -> pd.DataFrame:
def sentiment_scores(pdf):
    # Create a SentimentIntensityAnalyzer object.
    sid = SentimentIntensityAnalyzer()
    # polarity_scores method of SentimentIntensityAnalyzer
    # oject gives a sentiment dictionary.
    # which contains pos, neg, neu, and compound scores.
    # ----
    # r = sid.polarity_scores(sentance)

    # pdf[r] = json.dumps(sid.polarity_scores(sentance))

    # json.dumps(sid.polarity_scores(s))
    # -----
    # simpleSchema.neg = r.neg
    # simpleSchema.pos = r.pos
    # simpleSchema.compound = r.compound
    # simpleSchema.neu = r.neu
    # s1 = pdf['text']
    # pdf.columns
    # pdf['s2'] = pdf.assign(json.dumps(sid.polarity_scores(s1)))
    # json.dumps(sid.polarity_scores(s)))
    
    # return r
    # return pd.DataFrame(simpleSchema)
    # 
    return json.dumps(sid.polarity_scores(pdf['text']))#sid.polarity_scores(s)
    # return pdf.assign(rating = json.dumps(sid.polarity_scores(pdf)))
    # You can optionally set the return type of your UDF. The default return type␣,→is StringType.
    # udffactorial_p = udf(factorial_p, LongType())

# sentiment_scoresUDF = udf(sentiment_scores, Types.MapType(Types.StringType(),Types.DoubleType()))

# sentiment_scores_pUDF = pandas_udf(sentiment_scores, returnType=Types.MapType(Types.StringType(),Types.DoubleType()))

# sentiment_scores_pUDF = pandas_udf(sentiment_scores, returnType=simpleSchema)

#  udf_obj = UserDefinedFunction(
#      42         f, returnType=returnType, name=None, evalType=evalType, deterministic=True)

# sid = SentimentIntensityAnalyzer()
# ---
# mydata = {"text":['Hello wanderful world', 'Hello bad boy'], "rank":[0,1]}
# pdf_mstring = pd.DataFrame(mydata)
pdf_mstring
# ------
# pdf_mstring['t'] = pdf_mstring.text.apply(lambda c1:json.dumps(sid.polarity_scores(c1)))

# pdf_mstring['text2'] =pdf_mstring.apply(lambda c1:sentiment_scores(c1['text']), axis=1)

pdf_mstring['text2'] =pdf_mstring.apply(lambda c1:sentiment_scores(c1), axis=1)

# -----
# pdf_mstring['text2'] = pdf_mstring[['text', 'rank']].apply(lambda row: row['text'], axis=1)

# pdf_mstring['text2'] = pdf_mstring[['text', 'rank']].apply(lambda row: sentiment_scores(row), axis=1)
# pdf_mstring.assign(s2 = SentimentIntensityAnalyzer().polarity_scores(pdf_mstring.text))
# json.dumps(sid.polarity_scores('Hello wanderful world'))
# print(type(sid.polarity_scores('Hello wanderful world')))

# sdf_test = sdf_2
# add an index column
# sdf_tes = sdf_test.withColumn('index', F.monotonically_increasing_id())
# sdf_tes
# sdf_tes_min = sdf_tes.sort('index').limit(2)
# sdf_tes_min.show(truncate=20)
# pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)
# a Python native function that takes a pandas.DataFrame, and outputs a pandas.DataFrame.
ss = sdf_tes_min.select('text', 'index').groupby("text").applyInPandas(sentiment_scores, schema="rating string")
ss.show()
# 

# 
# pdf_mstring.drop('text2', axis=1, inplace = True)
# pdf_mstring

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 255, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 88, in dump_stream
    for batch in iterator:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 248, in init_stream_yield_batches
    for series in iterator:
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 429, in mapper
    return f(keys, vals)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 175, in <lambda>
    return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 160, in wrapped
    result = f(pd.concat(value_series, axis=1))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-568-e7156ae51678>", line 60, in sentiment_scores
  File "/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages/nltk/sentiment/vader.py", line 361, in polarity_scores
    self.constants.REGEX_REMOVE_PUNCTUATION)
  File "/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages/nltk/sentiment/vader.py", line 270, in __init__
    text = str(text.encode("utf-8"))
  File "/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages/pandas/core/generic.py", line 5141, in __getattr__
    return object.__getattribute__(self, name)
AttributeError: 'Series' object has no attribute 'encode'


In [506]:
sdf_tes_min.show()

+--------------------+---------------+-------------+-------------------+-----+
|                text|favourite_count|retweet_count|         created_at|index|
+--------------------+---------------+-------------+-------------------+-----+
|There's a lot hap...|              0|            0|2021-06-25 16:43:44|    0|
|There's a lot hap...|              0|            0|2021-06-25 16:43:44|    1|
+--------------------+---------------+-------------+-------------------+-----+



In [274]:
import pandas as pd
from pyspark.sql.functions import pandas_udf


df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

@pandas_udf("col1 string, col2 long")
def pandas_plus_len(
        s1: pd.Series, s2: pd.Series, pdf: pd.DataFrame) -> pd.DataFrame:
    # Regular columns are series and the struct column is a DataFrame.
    pdf['col2'] = s1 + s2.str.len() 
    return pdf  # the struct column expects a DataFrame to return

df.withColumn('new',pandas_plus_len("long_col", "string_col", "struct_col")).show()

+--------+----------+-----------------+--------------------+
|long_col|string_col|       struct_col|                 new|
+--------+----------+-----------------+--------------------+
|       1|  a string|[a nested string]|[a nested string, 9]|
+--------+----------+-----------------+--------------------+



In [358]:
from pyspark.sql.functions import pandas_udf, PandasUDFType,col


# @pandas_udf('double', PandasUDFType.SCALAR)
# def pandas_plus_one(v):
#     # `v` is a pandas Series
#     return v.add(1)  # outputs a pandas Series

@pandas_udf('string') #return value
def pandas_sum_two_columns(s1: pd.Series, s2: pd.Series) -> pd.Series:
    # return s.add(1)
    # pdf2 = pdf + s
    # return (s1 + s2).astype(str)
    return (s1 + s2).astype(str)



sdf_pandas_udf_ex1 = spark.range(10).withColumn('pdf',col("id")*2).withColumn('sum', pandas_sum_two_columns("id", "pdf"))
sdf_pandas_udf_ex1.show()
sdf_pandas_udf_ex1.agg({'pdf':'max'}).show()
print(sdf_pandas_udf_ex1.agg({'pdf':'max'}).collect()[0])
print(sdf_pandas_udf_ex1.agg({'pdf':'max'}).collect()[0][0])

'''
/opt/spark/python/pyspark/sql/pandas/functions.py:386: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
#   "in the future releases. See SPARK-28264 for more details.", UserWarning)
'''

+---+---+---+
| id|pdf|sum|
+---+---+---+
|  0|  0|  0|
|  1|  2|  3|
|  2|  4|  6|
|  3|  6|  9|
|  4|  8| 12|
|  5| 10| 15|
|  6| 12| 18|
|  7| 14| 21|
|  8| 16| 24|
|  9| 18| 27|
+---+---+---+

+--------+
|max(pdf)|
+--------+
|      18|
+--------+

Row(max(pdf)=18)
18




In [330]:
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()

0    1
1    4
2    9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+



In [332]:
# spark.range(10).withColumn('pdf',col("id").cast('string')).show()
spark.range(10).withColumn('pdf',col("id")*2).show()

+---+---+
| id|pdf|
+---+---+
|  0|  0|
|  1|  2|
|  2|  4|
|  3|  6|
|  4|  8|
|  5| 10|
|  6| 12|
|  7| 14|
|  8| 16|
|  9| 18|
+---+---+



In [367]:
import pandas as pd  
from pyspark.sql.functions import pandas_udf, ceil
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))  
def normalize(pdf):
    v = pdf.v
    # Assign new columns to a DataFrame
    return pdf.assign(normalized=(v - v.mean()) / v.std())
df.show()
df.groupby("id").applyInPandas(
    normalize, schema="id long, v double, normalized double").show() 

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

+---+----+-------------------+
| id|   v|         normalized|
+---+----+-------------------+
|  1| 1.0|-0.7071067811865475|
|  1| 2.0| 0.7071067811865475|
|  2| 3.0|-0.8320502943378437|
|  2| 5.0|-0.2773500981126146|
|  2|10.0| 1.1094003924504583|
+---+----+-------------------+



---
### def sentiment_scores & sentiment_scoresUDF

In [75]:
import sys
from pyspark.sql.functions import udf
# DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType,StructField, StructType, Row
# from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.types as Types
from nltk.sentiment.vader import SentimentIntensityAnalyzer

def sentiment_scores(sentance: str) -> dict :
    # Create a SentimentIntensityAnalyzer object.
    sid = SentimentIntensityAnalyzer()
    # polarity_scores method of SentimentIntensityAnalyzer
    # oject gives a sentiment dictionary.
    # which contains pos, neg, neu, and compound scores.
    r = sid.polarity_scores(sentance)
    return r
    # You can optionally set the return type of your UDF. The default return type␣,→is StringType.
    # udffactorial_p = udf(factorial_p, LongType())

sentiment_scoresUDF = udf(sentiment_scores, Types.MapType(Types.StringType(),Types.DoubleType()))

---
### sdf

create a new column with sentiment_scores

In [78]:
from pyspark.sql.functions import col,sqrt,log,reverse
sdf_2 = sdf_2.withColumn("rating", sentiment_scoresUDF(sdf_2.text))

# df.groupby("id").applyInPandas(
#     normalize, schema="id long, v double, normalized double").show()
# sdf_udf = sdf_2.groupby("text").applyInPandas(
    # sentiment_scores, schema="text string, rating string")

# sdf_pudf.show(2,truncate=20)
# t.show()
# sdf_2.toPandas().style.set_properties(subset=['text'], **{'width': '300px'})

In [79]:
sdf_2.show(2, truncate=30)

+------------------------------+---------------+-------------+-------------------+------------------------------+
|                          text|favourite_count|retweet_count|         created_at|                        rating|
+------------------------------+---------------+-------------+-------------------+------------------------------+
|@ThebanMonk @Ty_in_TX @El__...|              2|            0|2021-06-27 02:50:12|[neg -> 0.0, pos -> 0.0, co...|
|***Very Strawberry &amp; Bl...|              0|            0|2021-06-26 14:27:32|[neg -> 0.032, pos -> 0.06,...|
+------------------------------+---------------+-------------+-------------------+------------------------------+
only showing top 2 rows



In [80]:
from pyspark.sql.functions import col,sqrt,log,reverse
sdf_2 = sdf_2.withColumn("rating", sentiment_scoresUDF(sdf_2.text))
# t.show()
sdf_2.limit(2).toPandas().style.set_properties(subset=['text'], **{'width': '300px'})

  Unsupported type in conversion to Arrow: MapType(StringType,DoubleType,true)
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,text,favourite_count,retweet_count,created_at,rating
0,@ThebanMonk @Ty_in_TX @El__Bohemio @historybythpint @VeroniqueSemtex @TheRogue_Elf @heartsabustin @AllanKirkhart @i_ourpatio @vetjr89 @AltWoodstone @WorldhopperVive @Ajah1551 @GhostieMingo @beautatas @dieseldave97 @GentlemanRascal @meggymish @IAMISjp @blc3428 @BillHaggis @LadyDemosthenes @jr_bohl @freehorse8 @melly_stone @mddebm @John_Roberts__ @john_iamme @AmericanPurrl @KieranEleison @cgogolin @emmdub559 @rathernotsay @RantyAmyCurtis @Philly_Hoosier @Prissi_coffee @AWGecko @EarthSalter @MeerkatYitz And only one island instead of the whole world.,2,0,2021-06-27 02:50:12,"{'neg': 0.0, 'pos': 0.0, 'compound': 0.0, 'neu': 1.0}"
1,***Very Strawberry & Blue Cotton Candy*** Chocolate Ice Cream & Vanilla Ice Cream NSA Vanilla & Campfire Crush New York Cheesecake & Pineapple Dole Soft Serve Cold Brew Coffee Gelato & Cake Batter Sweet Coconut & Island Banana Cookie n' Cream & Caramel Sea Salt Gelato https://t.co/qT474mhoyy,0,0,2021-06-26 14:27:32,"{'neg': 0.032, 'pos': 0.06, 'compound': 0.34, 'neu': 0.907}"


In [60]:
sdf.toPandas().head(2).style.set_properties(subset=['text'], **{'width': '300px'})

Unnamed: 0,text,favourite_count,retweet_count,created_at,negative_nltk,positive_nltk,neutral_nltk,compound_nltk,rating
0,"@breakfasttv I received the Astrazeneca vaccine on April 9th. No, I do not regret it! I look forward to getting the Pfizer vaccine on the 28th of this month. I feel confident we will be fully protected. Even if it is unconventional. 😊",0,0,2021-06-20,0.049,0.199,0.752,0.7793,"{'neg': 0.049, 'pos': 0.199, 'compound': 0.7793, 'neu': 0.752}"
1,#Pfizer #AstraZeneca #Moderna #JohnsonAndJohnson What's up ! Safe and effective ! ⬇️ ⬇️ https://t.co/fvw1H8cmXT,3,5,2021-06-20,0.0,0.397,0.603,0.7639,"{'neg': 0.0, 'pos': 0.397, 'compound': 0.7639, 'neu': 0.603}"


In [61]:
sdf.toPandas().tail(2).style.set_properties(subset=['text'], **{'width': '300px'})

Unnamed: 0,text,favourite_count,retweet_count,created_at,negative_nltk,positive_nltk,neutral_nltk,compound_nltk,rating
6508,"RT @JohnRHewson: For the record, how many of our political leaders had Pfizer rather than AstraZeneca?",0,890,2021-06-19,0.0,0.0,1.0,0.0,"{'neg': 0.0, 'pos': 0.0, 'compound': 0.0, 'neu': 1.0}"
6509,RT @DrEricDing: 3) “After years of reading research on mixing vaccine types -- known as heterologous prime-boosting -- Morgon concluded tha…,0,55,2021-06-19,0.0,0.0,1.0,0.0,"{'neg': 0.0, 'pos': 0.0, 'compound': 0.0, 'neu': 1.0}"


In [62]:
sdf.show(2, truncate = 30)

+------------------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+------------------------------+
|                          text|favourite_count|retweet_count|created_at|negative_nltk|positive_nltk|neutral_nltk|compound_nltk|                        rating|
+------------------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+------------------------------+
|@breakfasttv I received the...|              0|            0|2021-06-20|        0.049|        0.199|       0.752|       0.7793|[neg -> 0.049, pos -> 0.199...|
|#Pfizer
#AstraZeneca
#Moder...|              3|            5|2021-06-20|          0.0|        0.397|       0.603|       0.7639|[neg -> 0.0, pos -> 0.397, ...|
+------------------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+------------------------------+
only showing top 2 rows



In [49]:
# Returns the first num rows as a list of Row.
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
sdf.head(2)

[Row(text='@breakfasttv I received the Astrazeneca vaccine on April 9th. No, I do not regret it! I look forward to getting the Pfizer vaccine on the 28th of this month. I feel confident we will be fully protected. Even if it is unconventional. 😊', favourite_count=0, retweet_count=0, created_at=datetime.date(2021, 6, 20), rating={'neg': 0.049, 'pos': 0.199, 'compound': 0.7793, 'neu': 0.752}),
 Row(text="#Pfizer\n#AstraZeneca\n#Moderna\n#JohnsonAndJohnson \n\nWhat's up !  Safe and effective !\n\n⬇️\n⬇️ https://t.co/fvw1H8cmXT", favourite_count=3, retweet_count=5, created_at=datetime.date(2021, 6, 20), rating={'neg': 0.0, 'pos': 0.397, 'compound': 0.7639, 'neu': 0.603})]

In [50]:
# Returns the last num rows as a list of Row.
sdf.tail(2)

[Row(text='RT @JohnRHewson: For the record, how many of our political leaders had Pfizer rather than AstraZeneca?', favourite_count=0, retweet_count=890, created_at=datetime.date(2021, 6, 19), rating={'neg': 0.0, 'pos': 0.0, 'compound': 0.0, 'neu': 1.0}),
 Row(text='RT @DrEricDing: 3) “After years of reading research on mixing vaccine types -- known as heterologous prime-boosting -- Morgon concluded tha…', favourite_count=0, retweet_count=55, created_at=datetime.date(2021, 6, 19), rating={'neg': 0.0, 'pos': 0.0, 'compound': 0.0, 'neu': 1.0})]

In [63]:
sdf.printSchema()

root
 |-- text: string (nullable = true)
 |-- favourite_count: integer (nullable = true)
 |-- retweet_count: integer (nullable = true)
 |-- created_at: date (nullable = true)
 |-- negative_nltk: double (nullable = true)
 |-- positive_nltk: double (nullable = true)
 |-- neutral_nltk: double (nullable = true)
 |-- compound_nltk: double (nullable = true)
 |-- rating: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)



In [81]:
 from pyspark.sql.functions import col 

 sdf_2 = sdf_2.withColumn('negative_nltk', col('rating')['neg']) \
.withColumn('positive_nltk', col('rating')['pos']) \
.withColumn('neutral_nltk', col('rating')['neu']) \
.withColumn('compound_nltk',col('rating')['compound']) \
.drop('rating')


In [82]:
sdf_2.limit(2).toPandas().style.set_properties(subset=['text'], **{'width': '300px'})

Unnamed: 0,text,favourite_count,retweet_count,created_at,negative_nltk,positive_nltk,neutral_nltk,compound_nltk
0,@ThebanMonk @Ty_in_TX @El__Bohemio @historybythpint @VeroniqueSemtex @TheRogue_Elf @heartsabustin @AllanKirkhart @i_ourpatio @vetjr89 @AltWoodstone @WorldhopperVive @Ajah1551 @GhostieMingo @beautatas @dieseldave97 @GentlemanRascal @meggymish @IAMISjp @blc3428 @BillHaggis @LadyDemosthenes @jr_bohl @freehorse8 @melly_stone @mddebm @John_Roberts__ @john_iamme @AmericanPurrl @KieranEleison @cgogolin @emmdub559 @rathernotsay @RantyAmyCurtis @Philly_Hoosier @Prissi_coffee @AWGecko @EarthSalter @MeerkatYitz And only one island instead of the whole world.,2,0,2021-06-27 02:50:12,0.0,0.0,1.0,0.0
1,***Very Strawberry & Blue Cotton Candy*** Chocolate Ice Cream & Vanilla Ice Cream NSA Vanilla & Campfire Crush New York Cheesecake & Pineapple Dole Soft Serve Cold Brew Coffee Gelato & Cake Batter Sweet Coconut & Island Banana Cookie n' Cream & Caramel Sea Salt Gelato https://t.co/qT474mhoyy,0,0,2021-06-26 14:27:32,0.032,0.06,0.907,0.34


In [66]:
sdf.toPandas().tail(2).style.set_properties(subset=['text'], **{'width': '300px'})

Unnamed: 0,text,favourite_count,retweet_count,created_at,negative_nltk,positive_nltk,neutral_nltk,compound_nltk
6508,"RT @JohnRHewson: For the record, how many of our political leaders had Pfizer rather than AstraZeneca?",0,890,2021-06-19,0.0,0.0,1.0,0.0
6509,RT @DrEricDing: 3) “After years of reading research on mixing vaccine types -- known as heterologous prime-boosting -- Morgon concluded tha…,0,55,2021-06-19,0.0,0.0,1.0,0.0


In [85]:
sdf_from_list_of_rows = spark.createDataFrame(sdf_2.tail(2))

sdf_from_list_of_rows.show(truncate=15)

+---------------+---------------+-------------+---------------+-------------+-------------+------------+-------------+
|           text|favourite_count|retweet_count|     created_at|negative_nltk|positive_nltk|neutral_nltk|compound_nltk|
+---------------+---------------+-------------+---------------+-------------+-------------+------------+-------------+
|Good Morning...|              0|            0|2021-06-27 1...|          0.0|        0.266|       0.734|       0.4404|
|Good Morning...|              0|            0|2021-06-26 1...|          0.0|        0.266|       0.734|       0.4404|
+---------------+---------------+-------------+---------------+-------------+-------------+------------+-------------+



In [229]:
sdf_from_pdf = spark.createDataFrame(pdf)

sdf_from_list_of_rows.show(2)

+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+
|                text|favourite_count|retweet_count|created_at|negative_nltk|positive_nltk|neutral_nltk|compound_nltk|
+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+
|RT @JohnRHewson: ...|              0|          890|2021-06-19|          0.0|          0.0|         1.0|          0.0|
|RT @DrEricDing: 3...|              0|           55|2021-06-19|          0.0|          0.0|         1.0|          0.0|
+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+



https://stackoverflow.com/questions/61608057/output-vader-sentiment-scores-in-columns-based-on-dataframe-rows-of-tweets

In [13]:
# Create the dataframe
# df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])

# Function to get rows at `rownums`
# def getrows(df, rownums=None):
    # return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])

# Get rows at positions 0 and 2.
# getrows(df, rownums=[0, 2]).collect()

# Spark GroupBy and Aggregate Functions

`GroupBy` allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeast customer data based off the name of the customer.

Once you've performed the GroupBy operation you can use an aggregate function off that data.An `aggregate function` aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

**`Dataframe Aggregation`**

A set of methods for aggregations on a DataFrame:

    agg
    avg
    count
    max
    mean
    min
    pivot
    sum

https://hendra-herviawan.github.io/pyspark-groupby-and-aggregate-functions.html


In [86]:
sdf_2.columns

['text',
 'favourite_count',
 'retweet_count',
 'created_at',
 'negative_nltk',
 'positive_nltk',
 'neutral_nltk',
 'compound_nltk']

In [87]:
sdf_2.select('created_at')

DataFrame[created_at: timestamp]

```
sdf_agg_byDate =  sdf.groupBy('created_at').agg({'negative_nltk':'sum'}, {'positive_nltk':'sum'}, {'neutral_nltk':'sum'},	{'compound_nltk':'sum'})
import pyspark.sql.functions as f
from pyspark.sql.functions import col

sdf_agg_byDate =  sdf.groupBy('created_at').agg({'negative_nltk':'sum','positive_nltk':'sum','neutral_nltk':'sum','compound_nltk':'sum','created_at':'count'})

sdf_agg_byDate.count()
```

In [79]:
# sdf.groupBy('created_at').count().select('created_at', f.col('count').alias('tweets')).show()

### Convert Timestamp to Date

> Syntax: to_date(timestamp_column)

> Syntax: to_date(timestamp_column,format)

PySpark timestamp (TimestampType) consists of value in the format yyyy-MM-dd HH:mm:ss.SSSS and Date (DateType) format would be yyyy-MM-dd. Use to_date() function to truncate time from Timestamp or to convert the timestamp to date on DataFrame column.

In [96]:
sdf_2 = sdf_2.withColumn('created_at', to_date(F.col('created_at')))
sdf_2.show(2, truncate=20)

+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+
|                text|favourite_count|retweet_count|created_at|negative_nltk|positive_nltk|neutral_nltk|compound_nltk|
+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+
|@ThebanMonk @Ty_i...|              2|            0|2021-06-27|          0.0|          0.0|         1.0|          0.0|
|***Very Strawberr...|              0|            0|2021-06-26|        0.032|         0.06|       0.907|         0.34|
+--------------------+---------------+-------------+----------+-------------+-------------+------------+-------------+
only showing top 2 rows



In [98]:
sdf_2.toPandas().head(1).style.set_properties(subset=['text'], **{'width': '300px'})

Unnamed: 0,text,favourite_count,retweet_count,created_at,negative_nltk,positive_nltk,neutral_nltk,compound_nltk
0,@ThebanMonk @Ty_in_TX @El__Bohemio @historybythpint @VeroniqueSemtex @TheRogue_Elf @heartsabustin @AllanKirkhart @i_ourpatio @vetjr89 @AltWoodstone @WorldhopperVive @Ajah1551 @GhostieMingo @beautatas @dieseldave97 @GentlemanRascal @meggymish @IAMISjp @blc3428 @BillHaggis @LadyDemosthenes @jr_bohl @freehorse8 @melly_stone @mddebm @John_Roberts__ @john_iamme @AmericanPurrl @KieranEleison @cgogolin @emmdub559 @rathernotsay @RantyAmyCurtis @Philly_Hoosier @Prissi_coffee @AWGecko @EarthSalter @MeerkatYitz And only one island instead of the whole world.,2,0,2021-06-27,0.0,0.0,1.0,0.0


```
from pyspark.sql.functions import to_date
sdf_aggg = (sdf_2
.groupBy(to_date(F.col('created_at')).alias('created_at'))
.agg(F.count('created_at').alias('tweets'))
.show())
```

In [82]:
sdf_agg_byDate.show()

+----------+-------------------+------------------+-----------------+------------------+------------------+
|created_at| sum(compound_nltk)|sum(positive_nltk)|count(created_at)|sum(negative_nltk)| sum(neutral_nltk)|
+----------+-------------------+------------------+-----------------+------------------+------------------+
|2021-06-20| -36.28830000000009| 112.9250000000001|             2407|           133.614|2160.4989999999916|
|2021-06-19|-106.83589999999874| 139.2199999999993|             4103|176.49200000000073|3787.2440000000083|
+----------+-------------------+------------------+-----------------+------------------+------------------+



In [None]:
# sdf_agg_byDate = sdf_agg_byDate.withColumn("sum", col("sum(negative_nltk)")+col("science_score"))
# 	df1.show()

### groupBy and aggregate on multiple columns

In [112]:
exprs={}
cols = ['created_at',
 'negative_nltk',
 'positive_nltk',
 'neutral_nltk',
 'compound_nltk']
exprs = {x: "sum" for x in cols}
exprs['created_at'] = 'count'
exprs

{'created_at': 'count',
 'negative_nltk': 'sum',
 'positive_nltk': 'sum',
 'neutral_nltk': 'sum',
 'compound_nltk': 'sum'}

In [159]:
# sdf_agg_byDate = sdf.groupBy('created_at').agg({'negative_nltk':'sum'}, {'positive_nltk':'sum'}, {'neutral_nltk':'sum'}, {'compound_nltk':'sum'})
import pyspark.sql.functions as f 
from pyspark.sql.functions import col

sdf_agg_byDate = sdf_2.groupBy('created_at').agg({'negative_nltk':'sum','positive_nltk':'sum','neutral_nltk':'sum','compound_nltk':'sum','created_at':'count'}).withColumnRenamed('count(created_at)', 'tweets')

sdf_agg_byDate.count()

2

In [160]:
sdf_agg_byDate.limit(1).toPandas()

Unnamed: 0,created_at,sum(compound_nltk),sum(positive_nltk),tweets,sum(negative_nltk),sum(neutral_nltk)
0,2021-06-27,16.0636,7.043,115,1.753,106.202


In [None]:
# sdf.groupBy("department","state") \
#     .sum("salary","bonus") \
#     .show(false)

In [162]:
sdf_2.groupBy('created_at') \
    .agg(exprs).withColumnRenamed('count(created_at)', 'tweets') \
    .limit(1).toPandas()

Unnamed: 0,created_at,sum(compound_nltk),sum(positive_nltk),tweets,sum(negative_nltk),sum(neutral_nltk)
0,2021-06-27,16.0636,7.043,115,1.753,106.202


In [163]:
# How to delete columns in pyspark dataframe
columns_to_drop = ['sum(compound_nltk)', 'sum(positive_nltk)', 'sum(negative_nltk)', 'sum(neutral_nltk)']

sdf_agg_byDate = (sdf_agg_byDate
    .withColumn('compound_nltk', f.col('sum(compound_nltk)')/F.col('tweets'))
    .withColumn( 'positive_nltk',f.col('sum(positive_nltk)')/F.col('tweets'))
    .withColumn( 'negativen_ltk',f.col('sum(negative_nltk)')/F.col('tweets'))
    .withColumn( 'neutral_nltk',f.col('sum(neutral_nltk)')/F.col('tweets'))
    .drop(*columns_to_drop)
    )

sdf_agg_byDate.limit(1).toPandas()

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk
0,2021-06-27,115,0.139683,0.061243,0.015243,0.923496


In [164]:
sdf_agg_byDate.toPandas()
# .drop(*columns_to_drop)

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk
0,2021-06-27,115,0.139683,0.061243,0.015243,0.923496
1,2021-06-26,70,0.198057,0.102986,0.032043,0.864957


In [165]:
sdf.created_at[0]

Column<b'created_at[0]'>

In [84]:
# The below statement changes column 'count(created_at)' to 'tweets' on PySpark DataFrame. 
# sdf_agg_byDate = (sdf_agg_byDate
#     .withColumnRenamed('count(created_at)', 'tweets'))

In [166]:
# DataFrame.sort(*cols, **kwargs) - Returns a new DataFrame sorted by the specified column(s).
sdf_agg_byDate = sdf_agg_byDate.sort('created_at')

sdf_agg_byDate.toPandas()

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk
0,2021-06-26,70,0.198057,0.102986,0.032043,0.864957
1,2021-06-27,115,0.139683,0.061243,0.015243,0.923496


In [167]:
# 
sdf_agg_byDate.dtypes

[('created_at', 'date'),
 ('tweets', 'bigint'),
 ('compound_nltk', 'double'),
 ('positive_nltk', 'double'),
 ('negativen_ltk', 'double'),
 ('neutral_nltk', 'double')]

In [225]:
sdf_agg_byDate.head(1)

[Row(created_at=datetime.date(2021, 6, 19), tweets=4103, compound_nltk=-0.026038484036070862, positive_nltk=0.033931269802583305, negativen_ltk=0.043015354618571956, neutral_nltk=0.923042651718257)]

In [226]:
sdf_agg_byDate.limit(1).show()

+----------+------+--------------------+--------------------+--------------------+-----------------+
|created_at|tweets|       compound_nltk|       positive_nltk|       negativen_ltk|     neutral_nltk|
+----------+------+--------------------+--------------------+--------------------+-----------------+
|2021-06-19|  4103|-0.02603848403607...|0.033931269802583305|0.043015354618571956|0.923042651718257|
+----------+------+--------------------+--------------------+--------------------+-----------------+



In [223]:
# last, head, tail
sdf_agg_byDate.first()

Row(created_at=datetime.date(2021, 6, 19), tweets=4103, compound_nltk=-0.026038484036070862, positive_nltk=0.033931269802583305, negativen_ltk=0.043015354618571956, neutral_nltk=0.923042651718257)

In [141]:
since, until

(datetime.date(2021, 6, 26), datetime.date(2021, 6, 28))

### to_date() – Convert String to Date Format

Syntax: to_date(column,format)

Example: to_date(col("string_column"),"MM-dd-yyyy")

In [168]:
# from pyspark.sql.functions import lit
# dates = ("2021-06-26",  "2021-06-27")
# date_from, date_to = [to_date(lit(s)).cast(TimestampType()) for s in dates]
# (date_from, date_to)

from pyspark.sql.functions import lit
dates = ("2021-06-26",  "2021-06-27")
date_from, date_to = [to_date(lit(s)) for s in dates]
(date_from, date_to)

(Column<b"to_date('2021-06-26')">, Column<b"to_date('2021-06-27')">)

In [151]:
# import datetime, time 
# dates = ("2021-06-26 00:00:00",  "2021-06-27 00:00:00")
# # date_from, date_to = ("2021-06-26",  "2021-06-27")

# timestamps = (
#     time.mktime(datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S").timetuple())
#     for s in dates)
from pyspark.sql.functions import to_date


sdf_agg_byDate.filter(col('created_at') > date_from).show()


+----------+------+-------------------+-------------------+--------------------+-----------------+
|created_at|tweets|      compound_nltk|      positive_nltk|       negativen_ltk|     neutral_nltk|
+----------+------+-------------------+-------------------+--------------------+-----------------+
|2021-06-27|   115|0.13968347826086958|0.06124347826086956|0.015243478260869567|0.923495652173913|
+----------+------+-------------------+-------------------+--------------------+-----------------+



In [169]:
sdf_agg_byDate.filter(col('created_at') >= date_from).show()

+----------+------+-------------------+-------------------+--------------------+------------------+
|created_at|tweets|      compound_nltk|      positive_nltk|       negativen_ltk|      neutral_nltk|
+----------+------+-------------------+-------------------+--------------------+------------------+
|2021-06-26|    70|0.19805714285714288|0.10298571428571429|0.032042857142857144|0.8649571428571429|
|2021-06-27|   115|0.13968347826086958|0.06124347826086956|0.015243478260869567| 0.923495652173913|
+----------+------+-------------------+-------------------+--------------------+------------------+



`sf = sf.filter(sf.my_col >= date_from).filter(sf.my_col <= date_to)
sf.count()`

https://stackoverflow.com/questions/31407461/datetime-range-filter-in-pyspark-sql


https://sparkbyexamples.com/pyspark/pyspark-difference-between-two-dates-days-months-years/

In [174]:
sdf_agg_byDate.filter(sdf_agg_byDate['created_at'] >= date_from).filter(sdf_agg_byDate['created_at'] <= date_to).count()

2

In [176]:
sdf_agg_byDate.select('*', sdf_agg_byDate['created_at'].between(date_from, date_to)).count()

2

In [178]:
sdf_agg_byDate.select('*', sdf_agg_byDate['created_at'].between(date_from, date_to)).toPandas()

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk,((created_at >= to_date('2021-06-26')) AND (created_at <= to_date('2021-06-27')))
0,2021-06-26,70,0.198057,0.102986,0.032043,0.864957,True
1,2021-06-27,115,0.139683,0.061243,0.015243,0.923496,True


### Evaluate sentiment: 

- positive -> 1, 
- negative -> -1, 
- neutral -> 0

In [182]:
import sys
from pyspark.sql.functions import udf
# DoubleType, FloatType, ByteType, IntegerType, LongType, ShortType, ArrayType,StructField, StructType, Row
# from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.types as Types


def sentiment_eval(comp_score: float) -> int :

    # if compound_score > 0.05 => 1 i.e positive
    if comp_score > 0.05:
        return 1
    elif comp_score < 0.05:
        return -1
    else:
        return 0

sentiment_evalUDF = udf(sentiment_eval, IntegerType())


In [None]:
sdf_agg_byDate['sentiment'] = (sdf_agg_byDate['compound_nltk']
        .apply(lambda comp: 'positive' if comp > 0.05 else 'negative' if comp < -0.05 else 'neutral'))

In [267]:
start = time.time()
sdf_agg_byDate.withColumn('sentiment', sentiment_evalUDF(col('compound_nltk'))).toPandas()
end=time.time()
print(f'elapsed: {end-start}')

elapsed: 13.501638650894165


In [262]:

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('float')
def sentiment_eval_pUDF(comp_score: pd.Series) -> pd.Series:
    s=[]
    # if compound_score > 0.05 => 1 i.e positive
    for elmnt in comp_score:
        if elmnt > 0.05:
            s.append(1)
        elif elmnt < 0.05:
            s.append(-1)
        else:
            s.append(0)
    return pd.Series(s)

In [261]:
x = pd.Series([0, 1, 2])
for el in x:
    print((el > 1))
print('*'*20)
print(x.any())   # because one element is zero
print(x.all())
  

False
False
True
********************
True
False


In [268]:
start = time.time()
sdf_agg_byDate.withColumn('sentiment', sentiment_eval_pUDF(col('compound_nltk'))).toPandas()
end=time.time()
print(f'elapsed: {end-start}')

# File "<ipython-input-248-44cbe12222e7>", line 7, in sentiment_eval_pUDF
#   File "/opt/anaconda/envs/pyspark_env/lib/python3.7/site-packages/pandas/core/generic.py", line 1330, in __nonzero__
#     f"The truth value of a {type(self).__name__} is ambiguous. "
# ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

# https://stackoverflow.com/questions/36921951/truth-value-of-a-series-is-ambiguous-use-a-empty-a-bool-a-item-a-any-o

elapsed: 12.960050106048584


In [274]:
pdf_agg_byDate = sdf_agg_byDate.toPandas()

In [275]:
pdf_agg_byDate

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk
0,2021-06-26,70,0.198057,0.102986,0.032043,0.864957
1,2021-06-27,115,0.139683,0.061243,0.015243,0.923496


In [277]:

start = time.time()
pdf_agg_byDate['sentiment'] = (pdf_agg_byDate['compound_nltk']
        .apply(lambda comp: 'positive' if comp > 0.05 else 'negative' if comp < -0.05 else 'neutral'))
end=time.time()
print(f'elapsed: {end-start}')

elapsed: 0.030269145965576172


In [278]:
pdf_agg_byDate

Unnamed: 0,created_at,tweets,compound_nltk,positive_nltk,negativen_ltk,neutral_nltk,sentiment
0,2021-06-26,70,0.198057,0.102986,0.032043,0.864957,positive
1,2021-06-27,115,0.139683,0.061243,0.015243,0.923496,positive


In [246]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3
# Create a Spark DataFrame that has three columns including a sturct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
df.show()


df = df.withColumn('derived', func("long_col", "string_col", "struct_col"))
# Create a Spark DataFrame that has three columns including a sturct column.

df.show()



root
 |-- long_col: long (nullable = true)
 |-- string_col: string (nullable = true)
 |-- struct_col: struct (nullable = true)
 |    |-- col1: string (nullable = true)

+--------+----------+-----------------+
|long_col|string_col|       struct_col|
+--------+----------+-----------------+
|       1|  a string|[a nested string]|
+--------+----------+-----------------+

+--------+----------+-----------------+--------------------+
|long_col|string_col|       struct_col|             derived|
+--------+----------+-----------------+--------------------+
|       1|  a string|[a nested string]|[a nested string, 9]|
+--------+----------+-----------------+--------------------+



In [192]:
from pyspark import SparkContext

### Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

https://aws.amazon.com/blogs/big-data/best-practices-for-successfully-managing-memory-for-apache-spark-applications-on-amazon-emr/

### Set sparkContext and sqlContext

In [202]:
from pyspark.sql import SQLContext
sc = spark.sparkContext
sqlContext = SQLContext(sc)
# sqlContext.sql("get spark.sql.shuffle.partitions=10")

In [222]:
# sqlContext.sql("set spark.sql.shuffle.partitions=10")
# spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set explicitly by the user. Note that spark.default.parallelism seems to only be working for raw RDD and is ignored when working with dataframes.

# If the task you are performing is not a join or aggregation and you are working with dataframes then setting these will not have any effect. You could, however, set the number of partitions yourself by calling df.repartition(numOfPartitions) (don't forget to assign it to a new val) in your code.
# sqlContext.sql("get spark.sql.shuffle.partitions")
spark.conf.get("spark.sql.shuffle.partitions")
# spark.conf.get("spark.default.parallelism")
# sc.getConf().getAll()
# sc.getConf("spark.default.parallelism")
# sc.conf.get("spark.driver.memory")
# spark.sparkContext.get(spark.sql.shuffle.partitions)#spark.sql.functions.partitions

configurations = spark.sparkContext.getConf().getAll()
for conf in configurations:
    print(conf[0],':',conf[1])


spark.eventLog.enabled : true
spark.app.id : local-1624903764057
spark.driver.host : spark-client
spark.driver.port : 36309
spark.repl.local.jars : file:///home/hadoopuser/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp_2.12-3.0.0.jar,file:///home/hadoopuser/.ivy2/jars/com.typesafe_config-1.3.0.jar,file:///home/hadoopuser/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar,file:///home/hadoopuser/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar,file:///home/hadoopuser/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,file:///home/hadoopuser/.ivy2/jars/com.navigamez_greex-1.0.jar,file:///home/hadoopuser/.ivy2/jars/org.json4s_json4s-ext_2.12-3.5.3.jar,file:///home/hadoopuser/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.12-0.2.2.jar,file:///home/hadoopuser/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar,file:///home/hadoopuser/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar,file:///home/hadoopuser/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar,file

In [236]:
configurations = spark.sparkContext.getConf().getAll()
for conf in configurations:
    if conf[0] == 'spark.sql.shuffle.partitions':
        print(conf[0],':',conf[1])

print(f'spark.sql.shuffle.partitions = {spark.conf.get("spark.sql.shuffle.partitions")}')
# spark.sql.shuffle.partitions = 200
# spark.executor.id 

spark.sql.shuffle.partitions = 200


# perfplot

In [68]:
import numpy as np
import pandas as pd
import perfplot

perfplot.save(
    "out.png",
    setup=lambda n: pd.DataFrame(np.arange(n * 3).reshape(n, 3)),
    n_range=[2**k for k in range(25)],
    kernels=[
        lambda df: len(df.index),
        lambda df: df.shape[0],
        lambda df: df[df.columns[0]].count(),
    ],
    labels=["len(df.index)", "df.shape[0]", "df[df.columns[0]].count()"],
    xlabel="Number of rows",
)

Output()