# **Bachelor Thesis: Twitter Text-Mining Tool**

## **Code Setup**

**Installing Dependencies**

In [0]:
!pip install pathlib
!pip install patool
!pip install internetarchive
!pip install elasticsearch

**Setup Variables**

In [0]:
# Define path names
path_to_files = '/content'

# Name result files 
rf_key_tweets = 'key_tweets.json'
rf_extracted_tweets = 'extracted_tweets.json'
rf_all_extracted_tweets = 'all_extracted_tweets.json'

# Define identifiers to download datasets
identifier_list = ['archiveteam-twitter-stream-2017-11', 'archiveteam-twitter-stream-2018-01', 'archiveteam-twitter-stream-2018-02']

# Define index name for Elasticsearch
es_index_name = 'tweet_results_2017-11'

# Define searchterms
st_bitcoin = ("Bitcoin", "bitcoin", "BTC", "btc")
st_litecoin = ("Litecoin", "Litecoin", "LTC", "ltc")
st_ripple = ("Ripple", "ripple", "XRP", "xrp")
searchterm = st_bitcoin

## **Code Pipeline**

**Download Data With Identifiers**

In [0]:
# This code cell downloads all dataset within the predefined identifier list to a predefined path

import internetarchive
from internetarchive import configure
from internetarchive import get_session
from internetarchive import get_item
from internetarchive import get_files

def downloader(identifier_list, path):
    # Grab all files in the identifier
    ident_list = identifier_list
    f_path = path

    print("1. Download files ...")
    print("     Number of Identifiers: ", len(ident_list))

    # Config File & Configuration
    user = 'lsonthi@web.de'
    pw = 'bachelor'
    configure(user, pw)
    config = dict(s3=dict(access='data_download', secret='bar'))
    s = get_session(config)
    s.access_key

    # Download all tar files from IA objects
    for ident in ident_list:
        item = s.get_item(ident)
        file_names = [f.name for f in get_files(ident, glob_pattern='*tar')]
        for i in file_names:
            item.download(files=i)
            
    return

**Extract Datasets**

In [0]:
# This code cell extracts in 2 steps all datasets to a predefined path: 
# 1) Extract all TAR files to get BZ2 files
# 2) Extract all BZ2 files to get JSON files + analyze tweet text from datasets

import os
import glob
import patoolib
from pathlib import Path

# Unzips all tar files
def extract_tar(path):
    path_to_files = path
    
    print("2. Unzip TAR files ...")
    
    for item in glob.glob(path_to_files + '/*.tar'): 
        dirpath = os.path.dirname(item)
        patoolib.extract_archive(item, outdir=dirpath)
        os.remove(os.fspath(item))
    return

# Unzips all bz2 files from the folders
def extract_bz2(path, searchterm, rf_name):
    path_to_files = path
    st = searchterm
    result_file = rf_name
    tweet_array = []
    
    print("3. Extracting BZ2 files ...")
    
    for item in glob.glob(path_to_files + '/**/*.bz2', recursive=True):
        dirpath = os.path.dirname(item)
        patoolib.extract_archive(item, outdir=dirpath)
        
        # Analyze tweets with function analyze_tweet_text()
        tweet_array = analyze_tweet_text(path_to_files, st)
        
        # Write key tweets to result file
        with open('key_tweets.json', 'w', encoding="utf-8") as file:
          for it in tweet_array:
            file.write("%s\n" % json.dumps(it))
        
        os.remove(os.fspath(item))
    return

**Analyze Tweets**

In [0]:
# This code cell filters JSON files for tweets which contain the one of the terms in the predefined searchterm list and extraxts only the key attributes from relevant tweets
# Fuction analyze_tweet_text() gets called in the function extract_bz2()

import os
import json
from pathlib import Path
from glob import iglob

def analyze_tweet_text(path, searchterms):
    path_to_json = path
    st = searchterms
    tweets_final = [] # Array for final Tweets
    
    print("4. Analyze Tweets ...")

    # Search for JSON files
    print("     Importing data ...")
    rootdir = Path(path_to_json)
    json_files = list(rootdir.glob('**/*.json')) # List with all JSON Files

    # Start filter process
    print("     Filter process started ...")
    for index, js in enumerate(json_files):
        with open(os.path.join(path_to_json, js), encoding='utf-8') as json_file:
            for line in json_file:
                if line.strip():
                    tweet_line = json.loads(line)

                # 1. Filter: Check for deleted tweets
                    if 'source' in tweet_line:
                    #  print("Tweet exists")

                # 2. Filter: Check if tweet has more than 140 characters (truncated = true)
                            if tweet_line['truncated'] == True:
                                tweet_text = tweet_line['extended_tweet']['full_text']
                            else:
                                tweet_text = tweet_line['text']

                # 3. Filter: Check if text contains any of the searchterms
                            if any(s in tweet_text for s in st):
                                key_tweet = []
                                
                                try:
                                  key_tweet.append(tweet_line)
                                  tweets_final.append(extract_key_info(key_tweet))
                                  print(len(tweets_final))
                                except ValueError:
                                  print("Decoding JSON has failed")
                            else:
                                continue
                    else:
                        continue

    return tweets_final

**Extract Key Information**

In [0]:
# This code cell extracts only the key attributes from the key files 
# Fuction extract_key_info() gets called in the function analyze_tweet_text()

import json

def extract_key_info(tweets):
    raw_tweets = tweets # Array for key tweets
    extracted_tweets = [] # Array for extracted tweets
    
    # Extract Date, ID, Text, User-ID, User-Name and User-Timezone
    for i in range(len(raw_tweets)):
        # Text Attributes - Check if text contains more than 140 characters
        if raw_tweets[i]['truncated'] == True:
            tweet_text = raw_tweets[i]['extended_tweet']['full_text']
        else:
            tweet_text = raw_tweets[i]['text']

        # Tweet Key Values
        tweet_date = raw_tweets[i]['created_at']
        tweet_id = raw_tweets[i]['id']
        tweet_source = raw_tweets[i]['source']

        # Tweet User Attributes
        tweet_user_id = raw_tweets[i]['user']['id']
        tweet_user_name = raw_tweets[i]['user']['name']
        tweet_user_location = raw_tweets[i]['user']['location']
        tweet_user_url =  raw_tweets[i]['user']['url']
        tweet_user_description = raw_tweets[i]['user']['description']
        tweet_user_verified = raw_tweets[i]['user']['verified']
        tweet_user_follower_count = raw_tweets[i]['user']['followers_count']
        tweet_user_friends_count = raw_tweets[i]['user']['friends_count']
        tweet_user_favourites_count = raw_tweets[i]['user']['favourites_count']
        tweet_user_statuses_count = raw_tweets[i]['user']['statuses_count']
        tweet_user_created_at = raw_tweets[i]['user']['created_at']
        tweet_user_utc_offset = raw_tweets[i]['user']['utc_offset']
        tweet_user_timezone = raw_tweets[i]['user']['time_zone']
        tweet_user_geo_enabled = raw_tweets[i]['user']['geo_enabled']
        tweet_user_language = raw_tweets[i]['user']['lang']

        # Tweet Attributes
        tweet_geo = raw_tweets[i]['geo']
        tweet_coordinates = raw_tweets[i]['coordinates']
        tweet_place = raw_tweets[i]['place']
        tweet_quote_count = raw_tweets[i]['quote_count']
        tweet_reply_count = raw_tweets[i]['reply_count']
        tweet_retweet_count = raw_tweets[i]['retweet_count']
        tweet_favorite_count = raw_tweets[i]['favorite_count']
        #tweet_hastags = raw_tweets[i]['entities']['hastags']
        #tweet_urls = raw_tweets[i]['entities']['urls']
        tweet_favorited = raw_tweets[i]['favorited']
        tweet_retweeted = raw_tweets[i]['retweeted']
        tweet_language = raw_tweets[i]['lang']
        tweet_timestamp = raw_tweets[i]['timestamp_ms']

        # Create a new JSON-Object structure
        jsonobj = {
            "created_at": tweet_date,
            "id": tweet_id,
            "text": tweet_text,
            "source": tweet_source,
            "user": {
                "id": tweet_user_id,
                "name": tweet_user_name,
                "location": tweet_user_location,
                "url": tweet_user_url,
                "description": tweet_user_description,
                "verified": tweet_user_verified,
                "followers_count": tweet_user_follower_count,
                "friends_count": tweet_user_friends_count,
                "favourites_count": tweet_user_favourites_count,
                "statuses_count": tweet_user_statuses_count,
                "created_at": tweet_user_created_at,
                "utc_offset": tweet_user_utc_offset,
                "time_zone": tweet_user_timezone,
                "geo_enabled": tweet_user_geo_enabled,
                "lang": tweet_user_language,
                },
            "geo": tweet_geo,
            "coordinates": tweet_coordinates,
            "place": tweet_place,
            "quote_count": tweet_quote_count,
            "reply_count": tweet_reply_count,
            "retweet_count": tweet_retweet_count,
            "favorite_count": tweet_favorite_count,
            #"hastags": tweet_hastags,
            #"urls": tweet_urls,
            "favorited": tweet_favorited,
            "retweeted": tweet_retweeted,
            "lang": tweet_language,
            "timestamp_ms": tweet_timestamp,
        }

        extracted_tweets.append(jsonobj)

    return extracted_tweets

**Index Result Files To Elasticsearch**

In [0]:
# This code cell connects to Elasticsearch, creates an index with a predefined index name and fills it with the extracted tweets

from elasticsearch import Elasticsearch
import json

def index_to_es(index_name, json_file):
  index_n = index_name
  json_f = json_file
  print("7. Index Tweets to Elasticsearch ...")

  # Connect to the elastic cluster
  es = Elasticsearch(['https://search-my-ba-cluster-ckk6c27ovy52lt7e4pjk76yfuu.eu-central-1.es.amazonaws.com'])
  print(es)

  # Setup the settings and mapping
  settings = {
      "settings": {
          "index.mapping.total_fields.limit": 80000,
          "number_of_shards": 5,
          "number_of_replicas": 1
      },
    "mappings": {
      "tweet": {
        "properties": {
          "created_at": {
            "type": "date",
            "format": "EEE MMM dd HH:mm:ss Z yyyy",
            "fields": {
              "keyword": {
                "type": "keyword",
                "ignore_above": 256
              }
            }
          },
          "coordinates.coordinates": {
            "type": "geo_point"
          },
          "place.bounding_box": {
            "type": "geo_shape",
            "coerce": "true",
            "ignore_malformed": "true"
          },
          "user": {
            "properties": {
              "created_at": {
                  "type": "date",
                  "format": "EEE MMM dd HH:mm:ss Z yyyy",
                  "fields": {
                      "keyword": {
                          "type": "keyword",
                          "ignore_above": 256
                      }
                  }
              }
            }
          }
        }
      }
    }
  }

  # Create a new index 
  es.indices.create(index=index_n, ignore=400, body=settings)
  print("   Index created")

  # Fill created index with all extracted tweets
  tweets = []

  with open(json_f) as source:
      for line in source:
          if line.strip():
              tweets.append(json.loads(line))

  for i in range(len(tweets)):
      res = es.index(index=index_n, ignore=400, doc_type='tweet',id=i,body=tweets[i])
  print("   Index filled")
  return

## **Execute Code**

In [0]:
# 1) Download datasets
downloader(identifier_list, path_to_files)

# 2) Extract datasets to BZ2 files
extract_tar(path_to_files)

# 3) Extract BZ2 files to JSON files and analyze files for relevant tweets
extract_bz2(path_to_files, searchterm, rf_key_tweets) #calls function analyze_tweet_text()

# 4) Index relevant tweets to Elasticsearch
index_to_es(es_index_name, rf_key_tweets)