In [0]:
#Import dependencies
import urllib
import tweepy as tw # Extract and interact with Twitter Data
from tqdm import tqdm # For loop visualizer
from pyspark.sql import Row
import config

# Check DBFS for AWS access credentials

In [0]:
%fs ls "/FileStore/tables/Databricks_AWS_Access"

In [0]:
#Read the CSV file as a dataframe from DBFS
FILE_TYPE = "csv"
FIRST_ROW_HEADER = "true"
DELIMITER = ","

aws_keys_df = spark.read.format(FILE_TYPE)\
.option("header", FIRST_ROW_HEADER)\
.option("sep", DELIMITER)\
.load("/FileStore/tables/Databricks_AWS_Access/Databricks_AWS_Access.csv")

In [0]:
#Read both keys into variables and encode the secret acccess key using urllib
ACCESS_KEY = aws_keys_df.select('Access key ID').first()['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').first()['Secret access key']

ENCODED_SECRET_KEY = urllib.parse.quote(SECRET_KEY, "")

# Create an AWS S3 Mount Point for writing data to the data lake

In [0]:
AWS_S3_BUCKET = 'testbucket-acg-aamir'
MOUNT_NAME = '/mnt/testbucket-acg-aamir'
SOURCE_URI = 's3n://{0}:{1}@{2}'.format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
dbutils.fs.mount(SOURCE_URI, MOUNT_NAME)

In [0]:
%fs ls '/mnt/testbucket-acg-aamir'

# Connect to Twitter API and pull tweets associated with Bitcoin

In [0]:
#AWS API Key and Secret
consumer_api_key = config.api_key
consumer_api_secret = config.api_secret

In [0]:
#Authentication for Twitter API
auth = tw.OAuthHandler(consumer_api_key, consumer_api_secret)
api = tw.API(auth, wait_on_rate_limit=True)

In [0]:
search_words = "#bitcoin -filter:retweets" 
date_since = "2021-06-01"
date_until= "2021-06-30"
# Collect 1,000,000 tweets
tweets = tw.Cursor(api.search,
              q=search_words,
              lang="en",
              since=date_since,
              until=date_until     
              ).items(1000000) 

# Create tweets data frame from Twitter API

In [0]:
#Create a list of twitter row objects
tweets_list = []
for tweet in tqdm(tweets):
  row = Row(user_name=tweet.user.name, 
            user_location=tweet.user.location, 
            user_description=tweet.user.description, 
            date=tweet.created_at, 
            text=tweet.text, 
            source=tweet.source, 
            favorite_count=tweet.favorite_count, 
            retweet_count=tweet.retweet_count)
  
  tweets_list.append(row)

In [0]:
#Verification on how many tweets were pulled
print(f"New tweets retrieved: {len(tweets_list)}")

In [0]:
tweets_df = spark.createDataFrame(tweets_list)

In [0]:
tweets_df.show()

# Write dataframe to S3

In [0]:
spark_tweets_df.write.save('/mnt/{}/{}'.format(MOUNT_NAME, 'SparkTweets'), format='csv')

# Visualize data using Databricks and Spark SQL

In [0]:
#Top 10 tweets by favorites
tweets_df.createOrReplaceTempView("BITCOIN_TWEETS")
display(spark.sql("SELECT user_name, favorite_count FROM BITCOIN_TWEETS ORDER BY favorite_count DESC LIMIT 10"))

In [0]:
#Top 10 by retweet count
display(spark.sql("SELECT user_name, retweet_count FROM BITCOIN_TWEETS ORDER BY retweet_count DESC LIMIT 10"))

In [0]:
#Count of tweets by source for top 5
display(spark.sql("SELECT source AS SOURCE, count(*) AS COUNT_OF_TWEETS FROM BITCOIN_TWEETS GROUP BY SOURCE ORDER BY COUNT_OF_TWEETS desc LIMIT 5"))