In [53]:
import json
import psycopg2
from psycopg2 import sql
import cachetools
import pickle
import time
import os

class CombinedCache:
    def __init__(self, maxsize=10000, ttl=300):
        # 分别为用户和推文创建缓存
        self.user_cache = cachetools.TTLCache(maxsize, ttl)
        self.tweet_cache = cachetools.TTLCache(maxsize, ttl)
        self.timestamps = {}  # Track last access time for TTL

    def get_user(self, user_id):
        """Retrieve a user from the user cache."""
        return self.user_cache.get(user_id, None)

    def get_tweet(self, tweet_id):
        """Retrieve a tweet from the tweet cache."""
        tweet = self.tweet_cache.get(tweet_id, None)
        if tweet and (time.time() - self.timestamps.get(tweet_id, 0) < self.tweet_cache.ttl):
            return tweet
        return None

    def put_user(self, user_id, user_data):
        """Store or update a user in the user cache."""
        self.user_cache[user_id] = user_data

    def put_tweet(self, tweet_id, tweet_data):
        """Store or update a tweet in the tweet cache."""
        self.tweet_cache[tweet_id] = tweet_data
        self.timestamps[tweet_id] = time.time()

    def remove_user(self, user_id):
        """Remove a user from the user cache if it exists."""
        if user_id in self.user_cache:
            del self.user_cache[user_id]

    def remove_tweet(self, tweet_id):
        """Remove a tweet from the tweet cache if it exists."""
        if tweet_id in self.tweet_cache:
            del self.tweet_cache[tweet_id]
            del self.timestamps[tweet_id]

# 创建缓存实例
combined_cache = CombinedCache()

In [None]:
#example for datastructrue in firestore
{
  "id_str": "123456",
  "text": "This is a sample tweet",
  "created_at": "2021-01-01T12:00:00Z",
  "user_id": "78910",  // 只存储用户ID
  "retweets": {
    "count": 100,
    "users": [
      "user_id1",
      "user_id2"
    ]
  },
  "hashtags": ["#example", "#sample"]
}

In [54]:
import firebase_admin
from firebase_admin import credentials, firestore
import json
from datetime import datetime
import pytz

# 初始化 Firebase 应用
cred = credentials.Certificate('D:/Download/twitter-a3b9a-firebase-adminsdk-b9pvo-f8f057cf01.json')

db = firestore.client()

In [55]:

def parse_twitter_date(datestr):
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(pytz.utc)

def main():
    tweets_seen = set()  # 跟踪已处理的推文ID

    with open("corona-out-3", "r") as f1:
        batch = db.batch()  # 创建一个Firestore批处理操作
        count = 0  # 计数批处理中的文档数量

        for line in f1:
            line = line.strip()
            if not line or not line.startswith('{'):
                continue

            try:
                data = json.loads(line)
                tweet_id = data.get('id_str')

                if tweet_id in tweets_seen or tweet_id is None:
                    continue

                tweets_seen.add(tweet_id)
                user_id = data['user'].get('id_str')

                # 准备 Firestore 的推文数据
                tweet_data = {
                    'id_str': tweet_id,
                    'text': data['text'],
                    'created_at': data['created_at'],
                    'user_id': user_id,
                    'like_count': data.get('favorite_count', 0),
                    'retweet_count': data.get('retweet_count', 0),
                    'reply_count': data.get('reply_count', 0),
                    'hashtags': [tag['text'] for tag in data.get('entities', {}).get('hashtags', [])]
                }
                tweet_ref = db.collection('tweets').document(str(tweet_id))
                batch.set(tweet_ref, tweet_data)

                # 检查是否存在转发状态
                if 'retweeted_status' in data:
                    retweeted = data['retweeted_status']
                    retweet_data = {
                        'original_tweet_id': tweet_id,
                        'retweeter_id': retweeted['user']['id_str'],
                        'retweet_time': retweeted['created_at'],
                        'retweet_text': retweeted.get('extended_tweet', {}).get('full_text', retweeted['text']) if 'extended_tweet' in retweeted else retweeted['text']
                    }
                    retweet_ref = db.collection('retweets').document(tweet_id + '_' + retweeted['user']['id_str'])
                    batch.set(retweet_ref, retweet_data)

                count += 2  # 更新两个文档
                if count >= 400:  #limit500
                    batch.commit()  # 提交批处理
                    batch = db.batch()  # 重置批处理
                    count = 0

            except Exception as e:
                print(f"Error processing line: {e}")
                continue

        if count > 0:
            batch.commit()  # 提交最后一批剩余的数据

if __name__ == "__main__":
    main()


KeyboardInterrupt: 

In [46]:
import psycopg2 
#postgre storage
# 数据库连接和其他初始化
host = "localhost"
dbname = "twitter_1"
user = "postgres"
password = "123456"
port = "5432"
conn = psycopg2.connect(host=host, dbname=dbname, user=user, password=password, port=port)
conn.autocommit = True
cursor = conn.cursor()

# 创建用户表
create_table_query = """
CREATE TABLE IF NOT EXISTS user_table (
    id BIGINT PRIMARY KEY,
    id_str VARCHAR(50) NOT NULL,
    name VARCHAR(255),
    screen_name VARCHAR(255),
    location VARCHAR(255),
    url VARCHAR(500),
    description TEXT,
    translator_type VARCHAR(50),
    protected BOOLEAN,
    verified BOOLEAN,
    followers_count INT,
    friends_count INT,
    listed_count INT,
    favourites_count INT,
    statuses_count INT,
    created_at TIMESTAMP WITHOUT TIME ZONE,
    utc_offset INT,
    time_zone VARCHAR(50),
    geo_enabled BOOLEAN,
    lang VARCHAR(50),
    contributors_enabled BOOLEAN,
    is_translator BOOLEAN,
    profile_background_color VARCHAR(7),
    profile_background_image_url VARCHAR(500),
    profile_background_image_url_https VARCHAR(500),
    profile_background_tile BOOLEAN,
    profile_link_color VARCHAR(7),
    profile_sidebar_border_color VARCHAR(7),
    profile_sidebar_fill_color VARCHAR(7),
    profile_text_color VARCHAR(7),
    profile_use_background_image BOOLEAN,
    profile_image_url VARCHAR(500),
    profile_image_url_https VARCHAR(500),
    profile_banner_url VARCHAR(500),
    default_profile BOOLEAN,
    default_profile_image BOOLEAN,
    following BOOLEAN,
    follow_request_sent BOOLEAN,
    notifications BOOLEAN
);
"""

try:
    cursor.execute(create_table_query)
    print("Table created successfully")
except Exception as e:
    print(f"An error occurred: {e}")

# 读取文件并处理每行
file_path = "corona-out-3"  # 确保这是正确的文件路径
with open(file_path, "r") as file:
    for line_number, line in enumerate(file, 1):
        line = line.strip()
        if not line:
            print(f"Skipping empty line: {line_number}")
            continue
        
        try:
            data = json.loads(line)
            user_id_str = data['user']['id_str']
            user_data = data['user']
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON on line {line_number}: {e}")
            continue
        
        # 尝试从缓存获取用户信息
        cached_user = cache.get(user_id_str)
        if cached_user:
            print(f"User {user_id_str} already exists in cache, skipping insert.")
            continue
        
        # 查询数据库以确认用户是否存在
        cursor.execute("SELECT * FROM user_table WHERE id_str = %s;", (user_id_str,))
        result = cursor.fetchone()
        
        if not result:
            try:
                # 将用户数据插入数据库
                cursor.execute("""
                    INSERT INTO user_table (id, id_str, name, screen_name, location, url, description, protected, verified, followers_count, friends_count, listed_count, favourites_count, statuses_count, created_at, profile_background_color, profile_link_color, profile_sidebar_border_color, profile_sidebar_fill_color, profile_text_color, profile_use_background_image, profile_image_url, profile_image_url_https, profile_banner_url, default_profile, default_profile_image, following, follow_request_sent, notifications)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                """, (
                    user_data['id'], user_data['id_str'], user_data['name'], user_data['screen_name'],
                    user_data['location'], user_data.get('url'), user_data['description'],
                    user_data['protected'], user_data['verified'], user_data['followers_count'],
                    user_data['friends_count'], user_data['listed_count'], user_data['favourites_count'],
                    user_data['statuses_count'], user_data['created_at'], user_data.get('profile_background_color'),
                    user_data.get('profile_link_color'), user_data.get('profile_sidebar_border_color'),
                    user_data.get('profile_sidebar_fill_color'), user_data.get('profile_text_color'),
                    user_data.get('profile_use_background_image'), user_data.get('profile_image_url'),
                    user_data.get('profile_image_url_https'), user_data.get('profile_banner_url'),
                    user_data.get('default_profile'), user_data.get('default_profile_image'),
                    user_data.get('following'), user_data.get('follow_request_sent'), user_data.get('notifications')
                ))
                print(f"User {user_id_str} inserted into the database.")
                # 添加用户信息到缓存
                cache.put(user_id_str, user_data)
            except Exception as ex:
                print(f"Error inserting data on line {line_number}: {ex}")
        else:
            print(f"User {user_id_str} already exists in the database, skipping insert.")

# 程序结束前保存缓存状态
cache.checkpoint_cache()
# 关闭游标和连接
cursor.close()
conn.close()


Table created successfully
User 804046791348015107 already exists in the database, skipping insert.
Skipping empty line: 2
User 2242948745 already exists in the database, skipping insert.
Skipping empty line: 4
User 908326492718764034 already exists in the database, skipping insert.
Skipping empty line: 6
User 2929344220 already exists in the database, skipping insert.
Skipping empty line: 8
User 1206650133976408064 already exists in the database, skipping insert.
Skipping empty line: 10
User 1248123252 already exists in the database, skipping insert.
Skipping empty line: 12
User 50993809 already exists in the database, skipping insert.
Skipping empty line: 14
User 792325679354417152 already exists in the database, skipping insert.
Skipping empty line: 16
User 1091660129894838272 already exists in the database, skipping insert.
Skipping empty line: 18
User 375777294 already exists in the database, skipping insert.
Skipping empty line: 20
User 469083492 already exists in the database, s

KeyboardInterrupt: 

In [None]:
import json
import pytz
from datetime import datetime
import firebase_admin
from firebase_admin import credentials, firestore
from collections import defaultdict

class CombinedCache:
    def __init__(self, maxsize=10000, ttl=300):
        self.user_cache = defaultdict(lambda: None)
        self.tweet_cache = defaultdict(lambda: None)
        self.timestamps = defaultdict(lambda: 0)
        self.maxsize = maxsize
        self.ttl = ttl

    def get_tweet(self, tweet_id):
        tweet = self.tweet_cache[tweet_id]
        if tweet and (time.time() - self.timestamps[tweet_id] < self.ttl):
            return tweet
        return None

    def put_tweet(self, tweet_id, tweet_data):
        self.tweet_cache[tweet_id] = tweet_data
        self.timestamps[tweet_id] = time.time()

    def load_from_disk(self):
        # This function should implement loading the cache state from disk
        pass

    def save_to_disk(self):
        # This function should implement saving the cache state to disk
        pass

def parse_twitter_date(datestr):
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(pytz.utc)

def main():
    # Firestore and cache initialization
    cred = credentials.Certificate('path/to/your/firebase/key.json')
    firebase_admin.initialize_app(cred)
    db = firestore.client()
    cache = CombinedCache()
    cache.load_from_disk()

    tweets_seen = set()  # Track processed Tweet IDs
    with open("corona-out-3", "r") as f1:
        batch = db.batch()
        count = 0

        for line in f1:
            line = line.strip()
            if not line or not line.startswith('{'):
                continue

            try:
                data = json.loads(line)
                tweet_id = data.get('id_str')
                if tweet_id in tweets_seen or tweet_id is None or cache.get_tweet(tweet_id):
                    continue

                tweets_seen.add(tweet_id)
                user_id = data['user'].get('id_str')

                tweet_data = {
                    'id_str': tweet_id,
                    'text': data['text'],
                    'created_at': data['created_at'],
                    'user_id': user_id,
                    'like_count': data.get('favorite_count', 0),
                    'retweet_count': data.get('retweet_count', 0),
                    'reply_count': data.get('reply_count', 0),
                    'hashtags': [tag['text'] for tag in data.get('entities', {}).get('hashtags', [])]
                }
                cache.put_tweet(tweet_id, tweet_data)
                tweet_ref = db.collection('tweets').document(str(tweet_id))
                batch.set(tweet_ref, tweet_data)

                if 'retweeted_status' in data:
                    retweeted = data['retweeted_status']
                    retweet_data = {
                        'original_tweet_id': tweet_id,
                        'retweeter_id': retweeted['user']['id_str'],
                        'retweet_time': retweeted['created_at'],
                        'retweet_text': retweeted.get('extended_tweet', {}).get('full_text', retweeted['text']) if 'extended_tweet' in retweeted else retweeted['text']
                    }
                    retweet_ref = db.collection('retweets').document(tweet_id + '_' + retweeted['user']['id_str'])
                    batch.set(retweet_ref, retweet_data)

                count += 2
                if count >= 400:
                    batch.commit()
                    batch = db.batch()
                    count = 0

            except Exception as e:
                print(f"Error processing line: {e}")
                continue

        if count > 0:
            batch.commit()

    # Optionally save cache state to disk here
    cache.save_to_disk()

if __name__ == "__main__":
    main()


In [56]:
from datetime import datetime, timedelta
import pytz
import tkinter as tk
from tkinter import messagebox, ttk, scrolledtext
import psycopg2 

In [57]:
def calculate_relevance(tweet, query):
    # 基础分数：文本匹配度（简单示例，实际应用中可能需要更复杂的文本分析）
    text_score = 1 if query.lower() in tweet['text'].lower() else 0

    # 互动分数
    interaction_score = (tweet.get('like_count', 0) * 0.1 +
                         tweet.get('retweet_count', 0) * 0.2 +
                         tweet.get('reply_count', 0) * 0.1)

    # 用户影响力分数
    influence_score = tweet.get('user', {}).get('follower_count', 0) * 0.001  # 根据需要调整权重

    # 综合评分
    return text_score * 0.5 + interaction_score * 0.3 + influence_score * 0.2


In [59]:
from datetime import datetime, timedelta
import pytz
import tkinter as tk
from tkinter import messagebox, ttk, scrolledtext
import psycopg2 
def parse_twitter_date(datestr):
    # 示例 datestr: "Sat Apr 25 12:21:41 +0000 2020"
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(pytz.utc)

def search_tweets(query, search_type="text", start_date=None, end_date=None, sort_by="created_at", order="DESC"):
    db = firestore.client()
    tweets = db.collection('tweets')

    if search_type == "text":
        tweets = tweets.where('text', '>=', query).where('text', '<=', query + '\uf8ff')
    elif search_type == "hashtag":
        tweets = tweets.where('hashtags', 'array_contains', query)

    if start_date and end_date:
        # 为用户输入的日期添加 UTC 时区信息
        utc_zone = pytz.utc
        start_date = utc_zone.localize(datetime.strptime(start_date, "%Y-%m-%d"))
        end_date = utc_zone.localize(datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1))
        
        filtered_results = []
        for doc in tweets.stream():
            tweet = doc.to_dict()
            tweet_created_at = parse_twitter_date(tweet['created_at'])
            if start_date <= tweet_created_at <= end_date:
                filtered_results.append(tweet)
        results = filtered_results
    else:
        results = [doc.to_dict() for doc in tweets.stream()]

    tweets = tweets.order_by(sort_by, direction=firestore.Query.DESCENDING if order == "DESC" else firestore.Query.ASCENDING)
    try:
        results = [doc.to_dict() for doc in tweets.stream()]
    except Exception as e:
        messagebox.showerror("Error", f"Failed to fetch tweets: {e}")
        return []

    user_ids = {result['user_id'] for result in results}
    user_data = get_user_data(list(user_ids)) if user_ids else {}
    
    for result in results:
        result['user'] = user_data.get(result['user_id'], {})

    return results

def get_user_data(user_ids):
    if not user_ids:
        return {}

    try:
        conn = psycopg2.connect(
            host="localhost",
            dbname="twitter_1",
            user="postgres",
            password="123456",
            port="5432"
        )
        cursor = conn.cursor()
        format_strings = ','.join(['%s'] * len(user_ids))
        cursor.execute(f"SELECT id, name, screen_name, location, url FROM user_table WHERE id IN ({format_strings})", tuple(user_ids))
        user_data = {row[0]: {'name': row[1], 'screen_name': row[2], 'location': row[3], 'url': row[4]} for row in cursor.fetchall()}
        cursor.close()
        conn.close()
        return user_data
    except Exception as e:
        messagebox.showerror("Error", f"Failed to fetch user data: {e}")
        return {}

def perform_search():
    search_query = query_entry.get()
    search_type = search_type_combobox.get()
    start_date = start_date_entry.get()
    end_date = end_date_entry.get()
    results = search_tweets(search_query, search_type, start_date, end_date)
    results_display.delete(1.0, tk.END)
    
    for index, result in enumerate(results):
        user_info = result.get('user', {})
        result_text = f"{result['text']} - {user_info.get('name', 'Unknown')} - {result.get('created_at', 'No date')}\n"
        results_display.insert(tk.END, result_text)
        details_button = tk.Button(results_display, text="Details", command=lambda r=result: show_details(r))
        results_display.window_create(tk.END, window=details_button)
        results_display.insert(tk.END, "\n")

def show_details(result):
    user_info = result.get('user', {})
    details = f"Tweet ID: {result.get('id_str', 'N/A')}\nUser: {user_info.get('name', 'Unknown')}\nScreen Name: {user_info.get('screen_name', 'N/A')}\nLocation: {user_info.get('location', 'N/A')}\nText: {result['text']}\nHashtags: {', '.join(result.get('hashtags', []))}"
    messagebox.showinfo("Tweet Details", details)

root = tk.Tk()
root.title("Twitter Search App")

# 创建界面元素
query_label = tk.Label(root, text="Enter search query:")
query_label.pack()
query_entry = tk.Entry(root)
query_entry.pack()

search_type_label = tk.Label(root, text="Search by:")
search_type_label.pack()
search_type_combobox = ttk.Combobox(root, values=("text", "hashtag", "user"))
search_type_combobox.pack()
search_type_combobox.current(0)

start_date_label = tk.Label(root, text="Start Date (YYYY-MM-DD):")
start_date_label.pack()
start_date_entry = tk.Entry(root)
start_date_entry.pack()

end_date_label = tk.Label(root, text="End Date (YYYY-MM-DD):")
end_date_label.pack()
end_date_entry = tk.Entry(root)
end_date_entry.pack()

search_button = tk.Button(root, text="Search", command=perform_search)
search_button.pack()

results_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, height=10, width=50)
results_display.pack(pady=20)

root.mainloop()

Exception in Tkinter callback
Traceback (most recent call last):
  File "E:\anacon\lib\tkinter\__init__.py", line 1921, in __call__
    return self.func(*args)
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\389532150.py", line 78, in perform_search
    results = search_tweets(search_query, search_type, start_date, end_date)
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\389532150.py", line 28, in search_tweets
    tweet_created_at = parse_twitter_date(tweet['created_at'])
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\389532150.py", line 8, in parse_twitter_date
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(pytz.utc)
  File "E:\anacon\lib\_strptime.py", line 568, in _strptime_datetime
    tt, fraction, gmtoff_fraction = _strptime(data_string, format)
  File "E:\anacon\lib\_strptime.py", line 349, in _strptime
    raise ValueError("time data %r does not match format %r" %
ValueError: time data '2021-01-01T12:00:00Z' does not ma

In [60]:
import pickle
from collections import OrderedDict

In [61]:
class LRUCache:
    def __init__(self, capacity: int = 100):
        self.cache = OrderedDict()
        self.capacity = capacity

    def get(self, key):
        if key not in self.cache:
            return None
        else:
            self.cache.move_to_end(key)
            return self.cache[key]

    def put(self, key, value):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

    def save_to_disk(self):
        with open('cache.pkl', 'wb') as f:
            pickle.dump(self.cache, f)

    def load_from_disk(self):
        try:
            with open('cache.pkl', 'rb') as f:
                self.cache = pickle.load(f)
        except FileNotFoundError:
            self.cache = OrderedDict()

In [51]:
cred = credentials.Certificate('D:/Download/twitter-a3b9a-firebase-adminsdk-b9pvo-f8f057cf01.json')
db = firestore.client()

# Global Cache
cache = LRUCache()
cache.load_from_disk()

def parse_twitter_date(datestr):
    timezone = pytz.timezone("UTC")  # Define your timezone
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(timezone)

def calculate_relevance(tweet, query):
    text_score = query.lower() in tweet['text'].lower()
    interaction_score = tweet.get('like_count', 0) + tweet.get('retweet_count', 0) + tweet.get('reply_count', 0)
    return text_score * 0.5 + interaction_score * 0.5

def get_user_data(user_ids, use_cache=True):
    results = {}
    missing_ids = []
    for user_id in user_ids:
        if use_cache:
            data = cache.get(user_id)
            if data:
                results[user_id] = data
                continue
        missing_ids.append(user_id)
    
    if missing_ids:
        try:
            conn = psycopg2.connect(host="localhost", dbname="twitter_1", user="postgres", password="123456", port="5432")
            cursor = conn.cursor()
            format_strings = ','.join(['%s'] * len(missing_ids))
            cursor.execute(f"SELECT id, name, screen_name, location, url FROM user_table WHERE id IN ({format_strings})", tuple(missing_ids))
            for row in cursor.fetchall():
                user_data = {'name': row[1], 'screen_name': row[2], 'location': row[3], 'url': row[4]}
                results[row[0]] = user_data
                if use_cache:
                    cache.put(row[0], user_data)
        except Exception as e:
            messagebox.showerror("Error", f"连接数据库失败: {e}")
        finally:
            if conn:
                conn.close()
    
    return results

def get_retweet_details(tweet_id):
    retweets = db.collection('retweets').where('original_tweet_id', '==', tweet_id).stream()
    user_ids = [retweet.to_dict()['retweeter_id'] for retweet in retweets]
    users_info = get_user_data(user_ids)
    retweet_info = []
    for user_id in user_ids:
        if user_id in users_info:
            user_info = users_info[user_id]
            retweet_info.append(f"Retweeted by: {user_info['name']} at {user_info.get('retweet_time', 'Unknown')}")
        else:
            retweet_info.append("Retweet details unavailable")
    return "\n".join(retweet_info)

def search_tweets(query, search_type="text", start_date=None, end_date=None, use_cache=True):
    tweets = db.collection('tweets')
    if search_type == "text":
        tweets = tweets.where('text', '>=', query).where('text', '<=', query + '\uf8ff')
    elif search_type == "hashtag":
        tweets = tweets.where('hashtags', 'array_contains', query)
    results = []
    if start_date and end_date:
        utc_zone = pytz.utc
        start_date = utc_zone.localize(datetime.strptime(start_date, "%Y-%m-%d"))
        end_date = utc_zone.localize(datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1))
        for doc in tweets.stream():
            tweet = doc.to_dict()
            tweet_created_at = parse_twitter_date(tweet['created_at'])
            if start_date <= tweet_created_at <= end_date:
                tweet['relevance_score'] = calculate_relevance(tweet, query)
                results.append(tweet)
        results.sort(key=lambda x: x['relevance_score'], reverse=True)
    else:
        results = [doc.to_dict() for doc in tweets.stream()]
    user_ids = {result['user_id'] for result in results}
    users_info = get_user_data(list(user_ids), use_cache=use_cache)  # 使用use_cache参数
    for result in results:
        result['user'] = users_info.get(result['user_id'], {})
        result['retweet_details'] = get_retweet_details(result['id_str'], use_cache=use_cache)  # 确保这里也传递了use_cache参数
    return results


def perform_search():
    search_query = query_entry.get()
    search_type = search_type_combobox.get()
    start_date = start_date_entry.get()
    end_date = end_date_entry.get()
    results = search_tweets(search_query, search_type, start_date, end_date)
    results_display.delete(1.0, tk.END)
    if not results:
        messagebox.showinfo("Search Result", "No tweets found matching your criteria.")
        return
    for index, result in enumerate(results):
        user_info = result.get('user', {})
        result_text = f"{result['text']} - {user_info.get('name', 'Unknown')} - {result.get('created_at', 'No date')} - Retweets: {result.get('retweet_details', 'None')}\n"
        results_display.insert(tk.END, result_text)
        details_button = tk.Button(results_display, text="Details", command=lambda r=result: show_details(r))
        results_display.window_create(tk.END, window=details_button)
        results_display.insert(tk.END, "\n")

def show_details(result):
    user_info = result.get('user', {})
    retweet_details = result.get('retweet_details', 'No retweet details available')
    details = f"Tweet ID: {result.get('id_str', 'N/A')}\nUser: {user_info.get('name', 'Unknown')}\nScreen Name: {user_info.get('screen_name', 'N/A')}\nLocation: {user_info.get('location', 'N/A')}\nText: {result['text']}\nHashtags: {', '.join(result.get('hashtags', []))}\nRetweet Details: {retweet_details}"
    messagebox.showinfo("Tweet Details", details)

In [52]:

# UI initialization and main loop
root = tk.Tk()
root.title("Twitter Search App")
query_label = tk.Label(root, text="Enter search query:")
query_label.pack()
query_entry = tk.Entry(root)
query_entry.pack()
search_type_label = tk.Label(root, text="Search by:")
search_type_label.pack()
search_type_combobox = ttk.Combobox(root, values=("text", "hashtag", "user"))
search_type_combobox.pack()
search_type_combobox.current(0)
start_date_label = tk.Label(root, text="Start Date (YYYY-MM-DD):")
start_date_label.pack()
start_date_entry = tk.Entry(root)
start_date_entry.pack()
end_date_label = tk.Label(root, text="End Date (YYYY-MM-DD):")
end_date_label.pack()
end_date_entry = tk.Entry(root)
end_date_entry.pack()
search_button = tk.Button(root, text="Search", command=perform_search)
search_button.pack()
results_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, height=10, width=50)
results_display.pack(pady=20)

def on_closing():
    cache.save_to_disk()
    root.destroy()

root.protocol("WM_DELETE_WINDOW", on_closing)
root.mainloop()

Exception in Tkinter callback
Traceback (most recent call last):
  File "E:\anacon\lib\tkinter\__init__.py", line 1921, in __call__
    return self.func(*args)
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\2015190796.py", line 93, in perform_search
    results = search_tweets(search_query, search_type, start_date, end_date)
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\2015190796.py", line 84, in search_tweets
    result['retweet_details'] = get_retweet_details(result['id_str'], use_cache=use_cache)  # 确保这里也传递了use_cache参数
TypeError: get_retweet_details() got an unexpected keyword argument 'use_cache'
Exception in Tkinter callback
Traceback (most recent call last):
  File "E:\anacon\lib\tkinter\__init__.py", line 1921, in __call__
    return self.func(*args)
  File "C:\Users\dell\AppData\Local\Temp\ipykernel_19964\2015190796.py", line 93, in perform_search
    results = search_tweets(search_query, search_type, start_date, end_date)
  File "C:\Users\dell\AppData\Loc

In [None]:
#test

In [42]:
from collections import OrderedDict
from datetime import datetime, timedelta

class LRUCache:
    def __init__(self, capacity=100, ttl=3600):  # ttl in seconds
        self.cache = OrderedDict()
        self.capacity = capacity
        self.ttl = ttl

    def get(self, key):
        if key not in self.cache:
            return None
        if (datetime.now() - self.cache[key][1]) > timedelta(seconds=self.ttl):
            self.cache.pop(key)  # Remove expired cache item
            return None
        self.cache.move_to_end(key)  # Move to end to show it was recently used
        return self.cache[key][0]

    def put(self, key, value):
        if key in self.cache:
            self.cache.move_to_end(key)  # Move to end to show it was recently used
        self.cache[key] = (value, datetime.now())
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)  # Remove least recently used item

    def clear_expired_items(self):
        current_time = datetime.now()
        keys_to_remove = [key for key, value in self.cache.items() if (current_time - value[1]) > timedelta(seconds=self.ttl)]
        for key in keys_to_remove:
            self.cache.pop(key)



In [43]:
import firebase_admin
from firebase_admin import credentials, firestore
import psycopg2
import json
from datetime import datetime
import pytz

# Firebase initialization
cred = credentials.Certificate('D:/Download/twitter-a3b9a-firebase-adminsdk-b9pvo-f8f057cf01.json')

db = firestore.client()

# PostgreSQL connection setup
conn = psycopg2.connect(
    host="localhost",
            dbname="twitter_1",
            user="postgres",
            password="123456",
            port="5432"
)
cursor = conn.cursor()

# Initialize cache
cache = LRUCache(capacity=100, ttl=3600)  # Customize as needed


In [21]:
def parse_twitter_date(datestr):
    return datetime.strptime(datestr, '%a %b %d %H:%M:%S %z %Y').astimezone(pytz.utc)

def process_tweet_line(line):
    try:
        data = json.loads(line)
        tweet_id = data.get('id_str')

        if tweet_id is None or cache.get(tweet_id):  # Skip if tweet is already processed
            return

        user_id = data['user'].get('id_str')
        tweet_data = {
            'id_str': tweet_id,
            'text': data['text'],
            'created_at': parse_twitter_date(data['created_at']),
            'user_id': user_id,
            'like_count': data.get('favorite_count', 0),
            'retweet_count': data.get('retweet_count', 0),
            'reply_count': data.get('reply_count', 0),
            'hashtags': [tag['text'] for tag in data.get('entities', {}).get('hashtags', [])]
        }
        tweet_ref = db.collection('tweets').document(str(tweet_id))
        tweet_ref.set(tweet_data)

        cache.put(tweet_id, tweet_data)  # Cache the processed tweet

        # Store user info in PostgreSQL if not already cached
        if not cache.get(user_id):
            user_data = data['user']
            cursor.execute("""
                INSERT INTO users (id, id_str, name, screen_name, location, url, description, protected, verified, followers_count, friends_count, listed_count, favourites_count, statuses_count, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
            """, (
                user_data['id'], user_data['id_str'], user_data['name'], user_data['screen_name'],
                user_data['location'], user_data.get('url'), user_data['description'],
                user_data['protected'], user_data['verified'], user_data['followers_count'],
                user_data['friends_count'], user_data['listed_count'], user_data['favourites_count'],
                user_data['statuses_count'], parse_twitter_date(user_data['created_at'])
            ))
            conn.commit()
            cache.put(user_id, user_data)  # Cache the user data

    except json.JSONDecodeError as e:
        print(f"Error processing line: {e}")


In [44]:
#app
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
import datetime

# Firestore 查询函数，支持字符串、主题标签、用户搜索，包括时间范围和排序
def search_tweets(db, query, search_type="text", start_date=None, end_date=None):
    query_result = db.collection('tweets')
    if search_type == "text":
        query_result = query_result.where('text', '==', query)
    elif search_type == "hashtag":
        query_result = query_result.where('hashtags', 'array_contains', query)
    elif search_type == "user":
        query_result = query_result.where('user_id', '==', query)

    if start_date:
        query_result = query_result.where('created_at', '>=', start_date)
    if end_date:
        query_result = query_result.where('created_at', '<=', end_date)

    # 应用排序（例如按转发数排序）
    query_result = query_result.order_by('retweet_count', direction=firestore.Query.DESCENDING)
    return query_result.stream()


In [None]:
def perform_search():
    query = query_entry.get()
    search_type = search_type_combobox.get()
    start_date = start_date_entry.get()
    end_date = end_date_entry.get()

    try:
        start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d') if start_date else None
        end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d') if end_date else None
    except ValueError:
        messagebox.showerror("Error", "Incorrect date format. Please use YYYY-MM-DD.")
        return

    results = search_tweets(db, query, search_type, start_date, end_date)
    results_display.delete('1.0', tk.END)  # 清空现有结果

    for doc in results:
        tweet = doc.to_dict()
        display_text = f"Tweet: {tweet['text']} (Author: {tweet['user_id']}, Posted: {tweet['created_at']}, Retweets: {tweet['retweet_count']})\n"
        results_display.insert(tk.END, display_text)

# 创建主窗口
root = tk.Tk()
root.title("Twitter Search App")

# 添加输入字段
query_label = tk.Label(root, text="Enter search query:")
query_label.pack()
query_entry = tk.Entry(root)
query_entry.pack()

search_type_label = tk.Label(root, text="Search by:")
search_type_label.pack()
search_type_combobox = ttk.Combobox(root, values=["text", "hashtag", "user"])
search_type_combobox.pack()
search_type_combobox.current(0)

start_date_label = tk.Label(root, text="Start Date (YYYY-MM-DD):")
start_date_label.pack()
start_date_entry = tk.Entry(root)
start_date_entry.pack()

end_date_label = tk.Label(root, text="End Date (YYYY-MM-DD):")
end_date_label.pack()
end_date_entry = tk.Entry(root)
end_date_entry.pack()

search_button = tk.Button(root, text="Search", command=perform_search)
search_button.pack()

results_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, height=10, width=50)
results_display.pack(pady=20)

root.mainloop()


In [24]:
def test_search_performance(query, search_type):
    import time

    start_time = time.time()
    results = search_tweets(db, query, search_type)
    results = list(results)  # Force the query to run
    end_time = time.time()

    print(f"Query time: {end_time - start_time} seconds")


In [35]:
def test_search_by_hashtag(db):
    results = search_tweets(db, "#covid", "hashtag")
    for doc in results:
        print(doc.to_dict())

test_search_by_hashtag(db)

def test_search_performance(db, query, search_type):
    start_time = time.time()
    results = search_tweets(db, query, search_type)
    results = list(results)  # 强制运行查询
    end_time = time.time()
    print(f"Query time without cache: {end_time - start_time} seconds")

    # 再次测试以评估缓存的效果
    start_time = time.time()
    results = search_tweets(db, query, search_type)
    results = list(results)
    end_time = time.time()
    print(f"Query time with cache: {end_time - start_time} seconds")

test_search_performance(db, "#covid", "hashtag")


Query time without cache: 0.13477659225463867 seconds
Query time with cache: 0.08554267883300781 seconds


In [None]:
#test

In [47]:
import json
import time
from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity=100, ttl=3600):
        self.cache = OrderedDict()
        self.capacity = capacity
        self.ttl = ttl

    def get(self, key):
        if key in self.cache:
            item = self.cache.pop(key)
            if time.time() - item[1] < self.ttl:
                self.cache[key] = item  # refresh the item
                return item[0]
            else:
                return None
        return None

    def put(self, key, value):
        if key in self.cache:
            self.cache.pop(key)
        elif len(self.cache) >= self.capacity:
            self.cache.popitem(last=False)
        self.cache[key] = (value, time.time())

    def save_to_disk(self, filename='cache.json'):
        with open(filename, 'w') as f:
            json.dump(self.cache, f, default=str)

    def load_from_disk(self, filename='cache.json'):
        try:
            with open(filename, 'r') as f:
                self.cache = json.load(f, object_hook=lambda d: {k: (v[0], float(v[1])) for k, v in d.items()})
        except FileNotFoundError:
            pass

cache = LRUCache(capacity=1000, ttl=3600)


In [48]:
import psycopg2
from datetime import datetime

def search_tweets(db, query_type, query, start_date=None, end_date=None):
    cursor = db.cursor()
    results = []
    if query_type == "text":
        cursor.execute("SELECT * FROM tweets WHERE text LIKE %s AND created_at BETWEEN %s AND %s ORDER BY retweet_count DESC", ('%' + query + '%', start_date, end_date))
    elif query_type == "hashtag":
        cursor.execute("SELECT * FROM tweets WHERE hashtags @> %s AND created_at BETWEEN %s AND %s ORDER BY retweet_count DESC", ([query], start_date, end_date))
    elif query_type == "user":
        cursor.execute("SELECT * FROM tweets WHERE user_id = %s AND created_at BETWEEN %s AND %s ORDER BY retweet_count DESC", (query, start_date, end_date))

    for tweet in cursor.fetchall():
        results.append({
            'author': tweet[3],  # Assuming the 4th column is user_id
            'tweeted_at': tweet[2],  # Assuming the 3rd column is created_at
            'retweets': tweet[6],  # Assuming the 7th column is retweet_count
            'text': tweet[1]  # Assuming the 2nd column is text
        })

    cursor.close()
    return results

def get_retweet_details(db, tweet_id):
    cursor = db.cursor()
    cursor.execute("SELECT user_id, created_at FROM retweets WHERE original_tweet_id = %s", (tweet_id,))
    retweets = [{'user_id': row[0], 'retweet_time': row[1]} for row in cursor.fetchall()]
    cursor.close()
    return retweets

def get_other_tweets_by_user(db, user_id):
    cursor = db.cursor()
    cursor.execute("SELECT text FROM tweets WHERE user_id = %s ORDER BY created_at DESC LIMIT 10", (user_id,))
    tweets = [row[0] for row in cursor.fetchall()]
    cursor.close()
    return tweets

def top_ten_users(db):
    cursor = db.cursor()
    cursor.execute("SELECT user_id, COUNT(*) as tweet_count FROM tweets GROUP BY user_id ORDER BY tweet_count DESC LIMIT 10")
    top_users = [{'user_id': row[0], 'tweets': row[1]} for row in cursor.fetchall()]
    cursor.close()
    return top_users

def top_ten_tweets(db):
    cursor = db.cursor()
    cursor.execute("SELECT text, retweet_count FROM tweets ORDER BY retweet_count DESC LIMIT 10")
    top_tweets = [{'text': row[0], 'retweets': row[1]} for row in cursor.fetchall()]
    cursor.close()
    return top_tweets


In [None]:
def perform_search():
    query = query_entry.get()
    search_type = search_type_combobox.get()
    start_date = start_date_entry.get()
    end_date = end_date_entry.get()

    try:
        start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d') if start_date else None
        end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d') if end_date else None
    except ValueError:
        messagebox.showerror("Error", "Incorrect date format. Please use YYYY-MM-DD.")
        return

    results = search_tweets(db, query, search_type, start_date, end_date)
    results_display.delete('1.0', tk.END)  # 清空现有结果

    for doc in results:
        tweet = doc.to_dict()
        display_text = f"Tweet: {tweet['text']} (Author: {tweet['user_id']}, Posted: {tweet['created_at']}, Retweets: {tweet['retweet_count']})\n"
        results_display.insert(tk.END, display_text)

# 创建主窗口
root = tk.Tk()
root.title("Twitter Search App")

# 添加输入字段
query_label = tk.Label(root, text="Enter search query:")
query_label.pack()
query_entry = tk.Entry(root)
query_entry.pack()

search_type_label = tk.Label(root, text="Search by:")
search_type_label.pack()
search_type_combobox = ttk.Combobox(root, values=["text", "hashtag", "user"])
search_type_combobox.pack()
search_type_combobox.current(0)

start_date_label = tk.Label(root, text="Start Date (YYYY-MM-DD):")
start_date_label.pack()
start_date_entry = tk.Entry(root)
start_date_entry.pack()

end_date_label = tk.Label(root, text="End Date (YYYY-MM-DD):")
end_date_label.pack()
end_date_entry = tk.Entry(root)
end_date_entry.pack()

search_button = tk.Button(root, text="Search", command=perform_search)
search_button.pack()

results_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, height=10, width=50)
results_display.pack(pady=20)

root.mainloop()
